[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=334508=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334508 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 26/Oct/19 09:38 Start Date: 26/Oct/19 09:38 Worklog Time Spent: 10m Work Description: stale[bot] commented on pull request #8371: [BEAM-5775] Move (most) of the batch spark pipelines' transformations to using lazy serialization. URL: https://github.com/apache/beam/pull/8371 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 334508) Time Spent: 12h 20m (was: 12h 10m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Time Spent: 12h 20m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=334507=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334507 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 26/Oct/19 09:38 Start Date: 26/Oct/19 09:38 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8371: [BEAM-5775] Move (most) of the batch spark pipelines' transformations to using lazy serialization. URL: https://github.com/apache/beam/pull/8371#issuecomment-546587395 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 334507) Time Spent: 12h 10m (was: 12h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Time Spent: 12h 10m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=330906=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-330906 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 19/Oct/19 09:08 Start Date: 19/Oct/19 09:08 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8371: [BEAM-5775] Move (most) of the batch spark pipelines' transformations to using lazy serialization. URL: https://github.com/apache/beam/pull/8371#issuecomment-544118190 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 330906) Time Spent: 12h (was: 11h 50m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Fix For: 2.17.0 > > Time Spent: 12h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=297678=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297678 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 20/Aug/19 08:13 Start Date: 20/Aug/19 08:13 Worklog Time Spent: 10m Work Description: iemejia commented on issue #8371: [BEAM-5775] Move (most) of the batch spark pipelines' transformations to using lazy serialization. URL: https://github.com/apache/beam/pull/8371#issuecomment-522905986 Hello @mikekap sorry for the late review cycle. This one is the only non merged PR for the Spark runner at the moment, however it needs a rebase. Any chance you can update it so we can run the performance tests again and see if it is ok to go. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 297678) Time Spent: 11h 40m (was: 11.5h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Fix For: 2.16.0 > > Time Spent: 11h 40m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=297679=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-297679 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 20/Aug/19 08:13 Start Date: 20/Aug/19 08:13 Worklog Time Spent: 10m Work Description: iemejia commented on issue #8371: [BEAM-5775] Move (most) of the batch spark pipelines' transformations to using lazy serialization. URL: https://github.com/apache/beam/pull/8371#issuecomment-522906258 Adding @RyanSkraba as co-reviewer in the meantime. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 297679) Time Spent: 11h 50m (was: 11h 40m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Fix For: 2.16.0 > > Time Spent: 11h 50m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=270939=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-270939 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 02/Jul/19 14:43 Start Date: 02/Jul/19 14:43 Worklog Time Spent: 10m Work Description: iemejia commented on issue #8371: [BEAM-5775] Move (most) of the batch spark pipelines' transformations to using lazy serialization. URL: https://github.com/apache/beam/pull/8371#issuecomment-507708827 This PR is not yet 'stale', revision on performance implications is still ongoing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 270939) Time Spent: 11.5h (was: 11h 20m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Fix For: 2.15.0 > > Time Spent: 11.5h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=270862=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-270862 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 02/Jul/19 13:16 Start Date: 02/Jul/19 13:16 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8371: [BEAM-5775] Move (most) of the batch spark pipelines' transformations to using lazy serialization. URL: https://github.com/apache/beam/pull/8371#issuecomment-507672018 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 270862) Time Spent: 11h 20m (was: 11h 10m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Fix For: 2.15.0 > > Time Spent: 11h 20m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=236837=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236837 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 03/May/19 12:52 Start Date: 03/May/19 12:52 Worklog Time Spent: 10m Work Description: iemejia commented on issue #8371: [BEAM-5775] Move (most) of the batch spark pipelines' transformations to using lazy serialization. URL: https://github.com/apache/beam/pull/8371#issuecomment-489084667 This looks like an open file limit issue but I still wonder why this changed because of this PR, some Spark internals maybe. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 236837) Time Spent: 11h 10m (was: 11h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Fix For: 2.13.0 > > Time Spent: 11h 10m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=236836=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236836 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 03/May/19 12:52 Start Date: 03/May/19 12:52 Worklog Time Spent: 10m Work Description: iemejia commented on issue #8371: [BEAM-5775] Move (most) of the batch spark pipelines' transformations to using lazy serialization. URL: https://github.com/apache/beam/pull/8371#issuecomment-489084667 This look like an open file limit issue but I still wonder why this changed because of this PR, some Spark internals maybe. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 236836) Time Spent: 11h (was: 10h 50m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Fix For: 2.13.0 > > Time Spent: 11h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=236835=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236835 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 03/May/19 12:43 Start Date: 03/May/19 12:43 Worklog Time Spent: 10m Work Description: iemejia commented on issue #8371: [BEAM-5775] Move (most) of the batch spark pipelines' transformations to using lazy serialization. URL: https://github.com/apache/beam/pull/8371#issuecomment-489081256 Could you notice any performance improvement with this PR? I like it for consistency, but I have found improvements and regressions depending on the pipelines. I also have a weird issue with this one @mikekap I was running nexmark to see if I could find considerable improvements due to this PR, but when I invoke it multiple times it fails, curiously this does not happen for example with current master. Would you mind to take a look to see if maybe is some configuration, it is strange. ```bash ./gradlew :beam-sdks-java-nexmark:run \ -Pnexmark.runner=":beam-runners-spark" \ -Pnexmark.args=" --runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true" ``` The exception log ``` == Run started 2019-05-03T12:36:02.235Z and ran for PT42.696S Default configuration: {"debug":true,"query":null,"sourceType":"DIRECT","sinkType":"DEVNULL","exportSummaryToBigQuery":false,"pubSubMode":"COMBINED","sideInputType":"DIRECT","sideInputRowCount":500,"sideInputNumShards":3,"sideInputUrl":null,"sessionGap":{"standardDays":0,"standardHours":0,"standardMinutes":10,"standardSeconds":600,"millis":60},"numEvents":10,"numEventGenerators":100,"rateShape":"SINE","firstEventRate":1,"nextEventRate":1,"rateUnit":"PER_SECOND","ratePeriodSec":600,"preloadSeconds":0,"streamTimeout":240,"isRateLimited":false,"useWallclockEventTime":false,"avgPersonByteSize":200,"avgAuctionByteSize":500,"avgBidByteSize":100,"hotAuctionRatio":2,"hotSellersRatio":4,"hotBiddersRatio":4,"windowSizeSec":10,"windowPeriodSec":5,"watermarkHoldbackSec":0,"numInFlightAuctions":100,"numActivePeople":1000,"coderStrategy":"HAND","cpuDelayMs":0,"diskBusyBytes":0,"auctionSkip":123,"fanout":5,"maxAuctionsWaitingTime":600,"occasionalDelaySec":3,"probDelayedEvent":0.1,"maxLogEvents":10,"usePubsubPublishTime":false,"outOfOrderGroupSize":1} Configurations: Conf Description query:PASSTHROUGH; streamTimeout:60 0001 query:CURRENCY_CONVERSION; streamTimeout:60 0002 query:SELECTION; streamTimeout:60 0003 query:LOCAL_ITEM_SUGGESTION; streamTimeout:60 0004 query:AVERAGE_PRICE_FOR_CATEGORY; numEvents:1; streamTimeout:60 0005 query:HOT_ITEMS; streamTimeout:60 0006 query:AVERAGE_SELLING_PRICE_BY_SELLER; numEvents:1; streamTimeout:60 0007 query:HIGHEST_BID; streamTimeout:60 0008 query:MONITOR_NEW_USERS; streamTimeout:60 0009 query:WINNING_BIDS; numEvents:1; streamTimeout:60 0010 query:LOG_TO_SHARDED_FILES; streamTimeout:60 0011 query:USER_SESSIONS; streamTimeout:60 0012 query:PROCESSING_TIME_WINDOWS; streamTimeout:60 0013 query:BOUNDED_SIDE_INPUT_JOIN; streamTimeout:60 0014 query:SESSION_SIDE_INPUT_JOIN; streamTimeout:60 Performance: Conf Runtime(sec)(Baseline) Events(/sec)(Baseline) Results (Baseline) 1.5 66093.9 10 0001 0.8130378.1 92000 0002 0.3325732.9 351 0003 2.3 43327.6 580 0004 0.9 11402.5 40 0005 2.0 48947.6 12 0006 0.5 19267.8 103 0007 2.4 40833.0 1 0008 1.6 63775.56000 0009 0.5 20120.7 298 0010 0.9114155.3 1 0011 1.0 97371.01919 0012 0.9109769.51919 0013
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=236832=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236832 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 03/May/19 12:32 Start Date: 03/May/19 12:32 Worklog Time Spent: 10m Work Description: iemejia commented on issue #8371: [BEAM-5775] Move (most) of the batch spark pipelines' transformations to using lazy serialization. URL: https://github.com/apache/beam/pull/8371#issuecomment-489079463 Run Spark Runner Nexmark Tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 236832) Time Spent: 10.5h (was: 10h 20m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Fix For: 2.13.0 > > Time Spent: 10.5h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=236833=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236833 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 03/May/19 12:39 Start Date: 03/May/19 12:39 Worklog Time Spent: 10m Work Description: iemejia commented on issue #8371: [BEAM-5775] Move (most) of the batch spark pipelines' transformations to using lazy serialization. URL: https://github.com/apache/beam/pull/8371#issuecomment-489081256 Could you notice any performance improvement with this PR? I like it for consistency, but I have found improvements and regressions depending on the pipelines. I also have a weird issue with this one @mikekap I was running nexmark to see if I could find considerable improvements due to this PR, but when I invoke it multiple times it fails, curiously this does not happen for example with current master. Would you mind to take a look to see if maybe is some configuration, it is strange. ```bash ./gradlew :beam-sdks-java-nexmark:run \ -Pnexmark.runner=":beam-runners-spark" \ -Pnexmark.args=" --runner=SparkRunner --suite=SMOKE --streamTimeout=60 --streaming=false --manageResources=false --monitorJobs=true" ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 236833) Time Spent: 10h 40m (was: 10.5h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Fix For: 2.13.0 > > Time Spent: 10h 40m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=236831=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-236831 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 03/May/19 12:30 Start Date: 03/May/19 12:30 Worklog Time Spent: 10m Work Description: iemejia commented on issue #8371: [BEAM-5775] Move (most) of the batch spark pipelines' transformations to using lazy serialization. URL: https://github.com/apache/beam/pull/8371#issuecomment-489078994 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 236831) Time Spent: 10h 20m (was: 10h 10m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Fix For: 2.13.0 > > Time Spent: 10h 20m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=233440=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-233440 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 26/Apr/19 10:52 Start Date: 26/Apr/19 10:52 Worklog Time Spent: 10m Work Description: VaclavPlajt commented on pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r278900074 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/coders/ValueAndCoderLazySerializable.java ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.coders; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables; +import org.apache.beam.vendor.guava.v20_0.com.google.common.io.ByteStreams; + +/** + * A holder object that lets you serialize an element with a Coder with minimal wasted space. + * Supports both Kryo and Java serialization. + * + * There are two different representations: a deserialized representation and a serialized + * representation. + * + * The deserialized representation stores a Coder and the value. To serialize the value, we write + * a length-prefixed encoding of value, but do NOT write the Coder used. + * + * The serialized representation just reads a byte array - the value is not deserialized fully. + * In order to get at the deserialized value, the caller must pass the Coder used to create this + * instance via getOrDecode(Coder). This reverts the representation back to the deserialized + * representation. + * + * @param element type + */ +public final class ValueAndCoderLazySerializable implements Serializable { + private T value; + // Re-use a field to save space in-memory. This is either a byte[] or a Coder, depending on + // which representation we are in. + private Object coderOrBytes; + + private ValueAndCoderLazySerializable(T value, Coder currentCoder) { +this.value = value; +this.coderOrBytes = currentCoder; + } + + @SuppressWarnings("unused") // for serialization + ValueAndCoderLazySerializable() {} + + public static ValueAndCoderLazySerializable of(T value, Coder coder) { +return new ValueAndCoderLazySerializable<>(value, coder); + } + + public T getOrDecode(Coder coder) { +if (!(coderOrBytes instanceof Coder)) { + ByteArrayInputStream bais = new ByteArrayInputStream((byte[]) this.coderOrBytes); + try { +value = coder.decode(bais); + } catch (IOException e) { +throw new IllegalStateException("Error decoding bytes for coder: " + coder, e); + } + this.coderOrBytes = coder; +} + +return value; + } + + private static class ByteSizeObserver extends ElementByteSizeObserver { +private long observedSize = 0; + +@Override +protected void reportElementSize(long elementByteSize) { + observedSize += elementByteSize; +} + } + + void writeCommon(OutputStream out) throws IOException { +if (!(coderOrBytes instanceof Coder)) { + byte[] bytes = (byte[]) coderOrBytes; + VarInt.encode(bytes.length, out); + out.write(bytes); +} else { + @SuppressWarnings("unchecked") + Coder coder = (Coder) coderOrBytes; + int bufferSize = 1024; + + if (coder.isRegisterByteSizeObserverCheap(value)) { +try { + ByteSizeObserver observer = new ByteSizeObserver(); + coder.registerByteSizeObserver(value, observer); Review comment: :clap: This is an automated message
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=233437=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-233437 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 26/Apr/19 10:47 Start Date: 26/Apr/19 10:47 Worklog Time Spent: 10m Work Description: VaclavPlajt commented on pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r278898397 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializable.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.base.Throwables; +import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; + +/** + * A holder object that lets you serialize an element with a Coder with minimal wasted space. + * Supports both Kryo and Java serialization. + * + * There are two different representations: a deserialized representation and a serialized + * representation. + * + * The deserialized representation stores a Coder and the value. To serialize the value, we write + * a length-prefixed encoding of value, but do NOT write the Coder used. + * + * The serialized representation just reads a byte array - the value is not deserialized fully. + * In order to get at the deserialized value, the caller must pass the Coder used to create this + * instance via getOrDecode(Coder). This reverts the representation back to the deserialized + * representation. + * + * @param element type + */ +public class ValueAndCoderKryoSerializable implements KryoSerializable, Externalizable { + private T value; + // Re-use a field to save space in-memory. This is either a byte[] or a Coder, depending on + // which representation we are in. + private Object coderOrBytes; + + ValueAndCoderKryoSerializable(T value, Coder currentCoder) { +this.value = value; +this.coderOrBytes = currentCoder; + } + + @SuppressWarnings("unused") // for serialization + public ValueAndCoderKryoSerializable() {} + + public T getOrDecode(Coder coder) throws IOException { +if (!(coderOrBytes instanceof Coder)) { + value = + coder.decode(new ByteArrayInputStream((byte[]) this.coderOrBytes), Coder.Context.OUTER); + this.coderOrBytes = coder; +} + +return value; + } + + private static class ByteSizeObserver extends ElementByteSizeObserver { +private long observedSize = 0; + +@Override +protected void reportElementSize(long elementByteSize) { + observedSize += elementByteSize; +} + } + + private void writeCommon(OutputStream out) throws IOException { +if (!(coderOrBytes instanceof Coder)) { + byte[] bytes = (byte[]) coderOrBytes; + VarInt.encode(bytes.length, out); + out.write(bytes); +} else { + @SuppressWarnings("unchecked") + Coder coder = (Coder) coderOrBytes; + int bufferSize = 1024; + + if (coder.isRegisterByteSizeObserverCheap(value)) { +try { + ByteSizeObserver observer = new ByteSizeObserver(); + coder.registerByteSizeObserver(value, observer); + bufferSize = (int) observer.observedSize; +} catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); +} + } + + ByteArrayOutputStream bytes = new ByteArrayOutputStream(bufferSize); + +
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=232864=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-232864 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 25/Apr/19 13:38 Start Date: 25/Apr/19 13:38 Worklog Time Spent: 10m Work Description: iemejia commented on issue #8371: [BEAM-5775] Move (most) of the batch spark pipelines' transformations to using lazy serialization. URL: https://github.com/apache/beam/pull/8371#issuecomment-486676455 @mikekap it seems this one needs a revert after the recent merge of #8387 Can you please rebase it. I am starting the review later on this one later on today. Thanks (and sorry for the delay). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 232864) Time Spent: 9h 50m (was: 9h 40m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Fix For: 2.13.0 > > Time Spent: 9h 50m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=230558=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-230558 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 22/Apr/19 00:19 Start Date: 22/Apr/19 00:19 Worklog Time Spent: 10m Work Description: mikekap commented on pull request #8371: [BEAM-5775] Move (most) of the batch spark pipelines' transformations to using lazy serialization. URL: https://github.com/apache/beam/pull/8371 This avoids unnecessary serialization. For example, if a groupByKey is happening & part of the shuffle ends up on the current worker, we'll avoid the unnecessary serialize/deserialize cycle. The main actual change in this PR (other than replacing `byte[]` with `ValueAndCoderSerializable`) is in `GroupNonMergingWindowsFunctions`. The semantics are slightly different in that we defer to Spark's serializer to serialize the values. This allows the previous optimization to keep working in a lazy way - if there are a lot of windows for a single value, Spark *should* serialize them only once since it's the same reference. In case Kryo is being used, the option `spark.kryo.referenceTracking` controls this behavior & defaults to true. For Java serialization, it's the only behavior available. I didn't touch spark streaming in this PR because I'm not sure how to address the backwards compatibility problem. Any thoughts there? R: @iemejia Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)[![Build
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226800=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226800 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 12/Apr/19 17:16 Start Date: 12/Apr/19 17:16 Worklog Time Spent: 10m Work Description: mikekap commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482653282 No problem - thanks for taking the time. I wasn’t particularly punctual in addressing feedback either so I’m happy to share the blame. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226800) Time Spent: 9.5h (was: 9h 20m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Fix For: 2.13.0 > > Time Spent: 9.5h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226653=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226653 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 12/Apr/19 13:18 Start Date: 12/Apr/19 13:18 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482571162 Merged ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226653) Time Spent: 9h 10m (was: 9h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Fix For: 2.13.0 > > Time Spent: 9h 10m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226561=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226561 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 12/Apr/19 10:03 Start Date: 12/Apr/19 10:03 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r274820730 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/coders/ValueAndCoderLazySerializable.java ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.coders; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables; +import org.apache.beam.vendor.guava.v20_0.com.google.common.io.ByteStreams; + +/** + * A holder object that lets you serialize an element with a Coder with minimal wasted space. + * Supports both Kryo and Java serialization. + * + * There are two different representations: a deserialized representation and a serialized + * representation. + * + * The deserialized representation stores a Coder and the value. To serialize the value, we write + * a length-prefixed encoding of value, but do NOT write the Coder used. + * + * The serialized representation just reads a byte array - the value is not deserialized fully. + * In order to get at the deserialized value, the caller must pass the Coder used to create this + * instance via getOrDecode(Coder). This reverts the representation back to the deserialized + * representation. + * + * @param element type + */ +public final class ValueAndCoderLazySerializable implements Serializable { + private T value; + // Re-use a field to save space in-memory. This is either a byte[] or a Coder, depending on + // which representation we are in. + private Object coderOrBytes; + + private ValueAndCoderLazySerializable(T value, Coder currentCoder) { +this.value = value; +this.coderOrBytes = currentCoder; + } + + @SuppressWarnings("unused") // for serialization + ValueAndCoderLazySerializable() {} + + public static ValueAndCoderLazySerializable of(T value, Coder coder) { +return new ValueAndCoderLazySerializable<>(value, coder); + } + + public T getOrDecode(Coder coder) { +if (!(coderOrBytes instanceof Coder)) { + ByteArrayInputStream bais = new ByteArrayInputStream((byte[]) this.coderOrBytes); + try { +value = coder.decode(bais); + } catch (IOException e) { +throw new IllegalStateException("Error decoding bytes for coder: " + coder, e); + } + this.coderOrBytes = coder; +} + +return value; + } + + private static class ByteSizeObserver extends ElementByteSizeObserver { +private long observedSize = 0; + +@Override +protected void reportElementSize(long elementByteSize) { + observedSize += elementByteSize; +} + } + + void writeCommon(OutputStream out) throws IOException { +if (!(coderOrBytes instanceof Coder)) { + byte[] bytes = (byte[]) coderOrBytes; + VarInt.encode(bytes.length, out); + out.write(bytes); +} else { + @SuppressWarnings("unchecked") + Coder coder = (Coder) coderOrBytes; + int bufferSize = 1024; + + if (coder.isRegisterByteSizeObserverCheap(value)) { +try { + ByteSizeObserver observer = new ByteSizeObserver(); + coder.registerByteSizeObserver(value, observer); Review comment: :clap: This is an automated message from
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226562=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226562 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 12/Apr/19 10:03 Start Date: 12/Apr/19 10:03 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r274826081 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java ## @@ -182,119 +176,4 @@ .map(TranslationUtils.fromPairFunction()) .map(TranslationUtils.toKVByWindowInValue()); } - - /** - * Wrapper around accumulated (combined) value with custom lazy serialization. Serialization is - * done through given coder and it is performed within on-serialization callbacks {@link - * #writeObject(ObjectOutputStream)} and {@link KryoAccumulatorSerializer#write(Kryo, Output, - * SerializableAccumulator)}. Both Spark's serialization mechanisms (Java Serialization, Kryo) are - * supported. Materialization of accumulated value is done when value is requested to avoid - * serialization of the coder itself. - * - * @param - */ - public static class SerializableAccumulator implements Serializable { Review comment: Huge +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226562) Time Spent: 9h (was: 8h 50m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 9h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226509=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226509 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 12/Apr/19 07:19 Start Date: 12/Apr/19 07:19 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482466587 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226509) Time Spent: 8h 40m (was: 8.5h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 8h 40m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226508=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226508 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 12/Apr/19 07:19 Start Date: 12/Apr/19 07:19 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482466395 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226508) Time Spent: 8.5h (was: 8h 20m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 8.5h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226507=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226507 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 12/Apr/19 07:19 Start Date: 12/Apr/19 07:19 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482466587 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226507) Time Spent: 8h 20m (was: 8h 10m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 8h 20m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226506=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226506 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 12/Apr/19 07:19 Start Date: 12/Apr/19 07:19 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482466327 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226506) Time Spent: 8h 10m (was: 8h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 8h 10m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226503=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226503 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 12/Apr/19 07:18 Start Date: 12/Apr/19 07:18 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482466327 "Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226503) Time Spent: 7h 50m (was: 7h 40m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 7h 50m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226504=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226504 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 12/Apr/19 07:18 Start Date: 12/Apr/19 07:18 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482466395 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226504) Time Spent: 8h (was: 7h 50m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 8h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226355=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226355 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 11/Apr/19 21:01 Start Date: 11/Apr/19 21:01 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482312116 Run Java PreCommi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226355) Time Spent: 7.5h (was: 7h 20m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 7.5h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226354=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226354 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 11/Apr/19 21:01 Start Date: 11/Apr/19 21:01 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482312280 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226354) Time Spent: 7h 20m (was: 7h 10m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 7h 20m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226356=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226356 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 11/Apr/19 21:01 Start Date: 11/Apr/19 21:01 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482312058 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226356) Time Spent: 7h 40m (was: 7.5h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 7h 40m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226353=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226353 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 11/Apr/19 21:00 Start Date: 11/Apr/19 21:00 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482312280 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226353) Time Spent: 7h 10m (was: 7h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 7h 10m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226351=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226351 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 11/Apr/19 21:00 Start Date: 11/Apr/19 21:00 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482312058 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226351) Time Spent: 6h 50m (was: 6h 40m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 6h 50m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226352=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226352 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 11/Apr/19 21:00 Start Date: 11/Apr/19 21:00 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482312116 Run Java PreCommi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226352) Time Spent: 7h (was: 6h 50m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 7h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226328=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226328 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 11/Apr/19 20:19 Start Date: 11/Apr/19 20:19 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482295685 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226328) Time Spent: 6h 40m (was: 6.5h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 6h 40m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226322=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226322 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 11/Apr/19 20:18 Start Date: 11/Apr/19 20:18 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482295625 Run Portable_Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226322) Time Spent: 5h 50m (was: 5h 40m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 5h 50m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226323=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226323 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 11/Apr/19 20:18 Start Date: 11/Apr/19 20:18 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482295685 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226323) Time Spent: 6h (was: 5h 50m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 6h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226325=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226325 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 11/Apr/19 20:18 Start Date: 11/Apr/19 20:18 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482295558 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226325) Time Spent: 6h 20m (was: 6h 10m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 6h 20m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226320=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226320 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 11/Apr/19 20:18 Start Date: 11/Apr/19 20:18 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482295504 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226320) Time Spent: 5.5h (was: 5h 20m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 5.5h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226327=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226327 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 11/Apr/19 20:18 Start Date: 11/Apr/19 20:18 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482295625 Run Portable_Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226327) Time Spent: 6.5h (was: 6h 20m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 6.5h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=226324=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-226324 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 11/Apr/19 20:18 Start Date: 11/Apr/19 20:18 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-482295504 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 226324) Time Spent: 6h 10m (was: 6h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 6h 10m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=220222=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-220222 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 28/Mar/19 19:17 Start Date: 28/Mar/19 19:17 Worklog Time Spent: 10m Work Description: dmvk commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-477735154 this makes sense, I'll take a closer look tomorrow This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 220222) Time Spent: 5h 20m (was: 5h 10m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 5h 20m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=220020=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-220020 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 28/Mar/19 14:23 Start Date: 28/Mar/19 14:23 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r270015945 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializer.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoException; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import java.io.IOException; + +/** Kryo serializer for {@link ValueAndCoderLazySerializable}. * */ +public class ValueAndCoderKryoSerializer extends Serializer> { Review comment: Please rename to `KryoValueAndCoderLazySerializable` and move to coders package please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 220020) Time Spent: 4h 50m (was: 4h 40m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 4h 50m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=220021=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-220021 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 28/Mar/19 14:23 Start Date: 28/Mar/19 14:23 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r270027831 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializable.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.base.Throwables; +import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; + +/** + * A holder object that lets you serialize an element with a Coder with minimal wasted space. + * Supports both Kryo and Java serialization. + * + * There are two different representations: a deserialized representation and a serialized + * representation. + * + * The deserialized representation stores a Coder and the value. To serialize the value, we write + * a length-prefixed encoding of value, but do NOT write the Coder used. + * + * The serialized representation just reads a byte array - the value is not deserialized fully. + * In order to get at the deserialized value, the caller must pass the Coder used to create this + * instance via getOrDecode(Coder). This reverts the representation back to the deserialized + * representation. + * + * @param element type + */ +public class ValueAndCoderKryoSerializable implements KryoSerializable, Externalizable { + private T value; + // Re-use a field to save space in-memory. This is either a byte[] or a Coder, depending on + // which representation we are in. + private Object coderOrBytes; + + ValueAndCoderKryoSerializable(T value, Coder currentCoder) { +this.value = value; +this.coderOrBytes = currentCoder; + } + + @SuppressWarnings("unused") // for serialization + public ValueAndCoderKryoSerializable() {} + + public T getOrDecode(Coder coder) throws IOException { +if (!(coderOrBytes instanceof Coder)) { + value = + coder.decode(new ByteArrayInputStream((byte[]) this.coderOrBytes), Coder.Context.OUTER); Review comment: Can't we just then remove the context and that's it to get rid of the warning? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 220021) Time Spent: 5h (was: 4h 50m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 5h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=219427=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-219427 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 27/Mar/19 16:45 Start Date: 27/Mar/19 16:45 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-477247306 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 219427) Time Spent: 4h 40m (was: 4.5h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 4h 40m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=219426=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-219426 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 27/Mar/19 16:45 Start Date: 27/Mar/19 16:45 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-477247306 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 219426) Time Spent: 4.5h (was: 4h 20m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 4.5h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=218057=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-218057 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 25/Mar/19 14:38 Start Date: 25/Mar/19 14:38 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-476225538 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 218057) Time Spent: 4h 10m (was: 4h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 4h 10m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=218049=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-218049 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 25/Mar/19 14:31 Start Date: 25/Mar/19 14:31 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-476222631 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 218049) Time Spent: 3h 50m (was: 3h 40m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 3h 50m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=218050=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-218050 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 25/Mar/19 14:32 Start Date: 25/Mar/19 14:32 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-476222631 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 218050) Time Spent: 4h (was: 3h 50m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 4h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=218058=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-218058 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 25/Mar/19 14:39 Start Date: 25/Mar/19 14:39 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-476225538 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 218058) Time Spent: 4h 20m (was: 4h 10m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 4h 20m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=217778=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-217778 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 25/Mar/19 01:45 Start Date: 25/Mar/19 01:45 Worklog Time Spent: 10m Work Description: mikekap commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-476026946 @iemejia @VaclavPlajt updated. Sorry for the long wait. As suggested by @iemejia I combined the two use cases of lazy serialization - `ValueAndCoderLazySerializable` is almost a drop-in replacement for `SerializableAccumulator`. I also converted the tests so the serialization is now tested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 217778) Time Spent: 3h 40m (was: 3.5h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 3h 40m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=21=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-21 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 25/Mar/19 01:40 Start Date: 25/Mar/19 01:40 Worklog Time Spent: 10m Work Description: mikekap commented on pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r268467673 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializable.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.base.Throwables; +import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; + +/** + * A holder object that lets you serialize an element with a Coder with minimal wasted space. + * Supports both Kryo and Java serialization. + * + * There are two different representations: a deserialized representation and a serialized + * representation. + * + * The deserialized representation stores a Coder and the value. To serialize the value, we write + * a length-prefixed encoding of value, but do NOT write the Coder used. + * + * The serialized representation just reads a byte array - the value is not deserialized fully. + * In order to get at the deserialized value, the caller must pass the Coder used to create this + * instance via getOrDecode(Coder). This reverts the representation back to the deserialized + * representation. + * + * @param element type + */ +public class ValueAndCoderKryoSerializable implements KryoSerializable, Externalizable { + private T value; + // Re-use a field to save space in-memory. This is either a byte[] or a Coder, depending on + // which representation we are in. + private Object coderOrBytes; + + ValueAndCoderKryoSerializable(T value, Coder currentCoder) { +this.value = value; +this.coderOrBytes = currentCoder; + } + + @SuppressWarnings("unused") // for serialization + public ValueAndCoderKryoSerializable() {} + + public T getOrDecode(Coder coder) throws IOException { +if (!(coderOrBytes instanceof Coder)) { + value = + coder.decode(new ByteArrayInputStream((byte[]) this.coderOrBytes), Coder.Context.OUTER); + this.coderOrBytes = coder; +} + +return value; + } + + private static class ByteSizeObserver extends ElementByteSizeObserver { +private long observedSize = 0; + +@Override +protected void reportElementSize(long elementByteSize) { + observedSize += elementByteSize; +} + } + + private void writeCommon(OutputStream out) throws IOException { +if (!(coderOrBytes instanceof Coder)) { + byte[] bytes = (byte[]) coderOrBytes; + VarInt.encode(bytes.length, out); + out.write(bytes); +} else { + @SuppressWarnings("unchecked") + Coder coder = (Coder) coderOrBytes; + int bufferSize = 1024; + + if (coder.isRegisterByteSizeObserverCheap(value)) { +try { + ByteSizeObserver observer = new ByteSizeObserver(); + coder.registerByteSizeObserver(value, observer); + bufferSize = (int) observer.observedSize; +} catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); +} + } + + ByteArrayOutputStream bytes = new ByteArrayOutputStream(bufferSize); + + try
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=217776=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-217776 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 25/Mar/19 01:40 Start Date: 25/Mar/19 01:40 Worklog Time Spent: 10m Work Description: mikekap commented on pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r268467653 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializable.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.base.Throwables; +import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; + +/** + * A holder object that lets you serialize an element with a Coder with minimal wasted space. + * Supports both Kryo and Java serialization. + * + * There are two different representations: a deserialized representation and a serialized + * representation. + * + * The deserialized representation stores a Coder and the value. To serialize the value, we write + * a length-prefixed encoding of value, but do NOT write the Coder used. + * + * The serialized representation just reads a byte array - the value is not deserialized fully. + * In order to get at the deserialized value, the caller must pass the Coder used to create this + * instance via getOrDecode(Coder). This reverts the representation back to the deserialized + * representation. + * + * @param element type + */ +public class ValueAndCoderKryoSerializable implements KryoSerializable, Externalizable { + private T value; + // Re-use a field to save space in-memory. This is either a byte[] or a Coder, depending on + // which representation we are in. + private Object coderOrBytes; + + ValueAndCoderKryoSerializable(T value, Coder currentCoder) { +this.value = value; +this.coderOrBytes = currentCoder; + } + + @SuppressWarnings("unused") // for serialization + public ValueAndCoderKryoSerializable() {} + + public T getOrDecode(Coder coder) throws IOException { +if (!(coderOrBytes instanceof Coder)) { + value = + coder.decode(new ByteArrayInputStream((byte[]) this.coderOrBytes), Coder.Context.OUTER); + this.coderOrBytes = coder; +} + +return value; + } + + private static class ByteSizeObserver extends ElementByteSizeObserver { +private long observedSize = 0; + +@Override +protected void reportElementSize(long elementByteSize) { + observedSize += elementByteSize; +} + } + + private void writeCommon(OutputStream out) throws IOException { +if (!(coderOrBytes instanceof Coder)) { + byte[] bytes = (byte[]) coderOrBytes; + VarInt.encode(bytes.length, out); + out.write(bytes); +} else { + @SuppressWarnings("unchecked") + Coder coder = (Coder) coderOrBytes; + int bufferSize = 1024; + + if (coder.isRegisterByteSizeObserverCheap(value)) { Review comment: I'm not 100% sure if this matters, but aside from the observer, this code doesn't actually ever produce a `byte[]` of the serialized output. Specifically, `ByteArrayOutputStream.writeTo` is used to avoid that. If the buffer inside `ByteArrayOutputStream` is bigger than the content, this allows us to avoid an array copy. To avoid the copy properly when returning bytes, Java has a
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=217775=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-217775 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 25/Mar/19 01:33 Start Date: 25/Mar/19 01:33 Worklog Time Spent: 10m Work Description: mikekap commented on pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r268467109 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializable.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.base.Throwables; +import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; + +/** + * A holder object that lets you serialize an element with a Coder with minimal wasted space. + * Supports both Kryo and Java serialization. + * + * There are two different representations: a deserialized representation and a serialized + * representation. + * + * The deserialized representation stores a Coder and the value. To serialize the value, we write + * a length-prefixed encoding of value, but do NOT write the Coder used. + * + * The serialized representation just reads a byte array - the value is not deserialized fully. + * In order to get at the deserialized value, the caller must pass the Coder used to create this + * instance via getOrDecode(Coder). This reverts the representation back to the deserialized + * representation. + * + * @param element type + */ +public class ValueAndCoderKryoSerializable implements KryoSerializable, Externalizable { + private T value; + // Re-use a field to save space in-memory. This is either a byte[] or a Coder, depending on + // which representation we are in. + private Object coderOrBytes; + + ValueAndCoderKryoSerializable(T value, Coder currentCoder) { +this.value = value; +this.coderOrBytes = currentCoder; + } + + @SuppressWarnings("unused") // for serialization + public ValueAndCoderKryoSerializable() {} + + public T getOrDecode(Coder coder) throws IOException { +if (!(coderOrBytes instanceof Coder)) { + value = + coder.decode(new ByteArrayInputStream((byte[]) this.coderOrBytes), Coder.Context.OUTER); Review comment: As far as I can tell, these aren't deprecated - `CoderHelpers` uses these as well. I think the `@Deprecated` + `@Experimental` are trying to convince `Coder` authors to not depend on these outside internal Beam development - internally though they should be used. (My evidence is this PR: https://github.com/apache/beam/commit/fda3a43be3277d0dca888cfa30693599d11cd5af ). Let me know if I interpreted that incorrectly though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 217775) Time Spent: 3h 10m (was: 3h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=208457=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-208457 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 06/Mar/19 06:31 Start Date: 06/Mar/19 06:31 Worklog Time Spent: 10m Work Description: VaclavPlajt commented on pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r262804191 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializable.java ## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.VarInt; + +/** + * A holder object that lets you serialize an element with a Coder with minimal wasted space. + * Supports both Kryo and Java serialization. + * + * There are two different representations: a deserialized representation and a serialized + * representation. + * + * The deserialized representation stores a Coder and the value. To serialize the value, we write + * a length-prefixed encoding of value, but do NOT write the Coder used. + * + * The serialized representation just reads a byte array - the value is not deserialized fully. + * In order to get at the deserialized value, the caller must pass the Coder used to create this + * instance via get(Coder). This reverts the representation back to the deserialized representation. + * + * @param element type + */ +public class ValueAndCoderKryoSerializable implements KryoSerializable, Externalizable { Review comment: I also support breaking the Kryo serialization functionality away. Please consider leaving out "Kryo" from class name when doing it. Highlighting lazyness of serialization either in name or javadoc could be a bonus. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 208457) Time Spent: 2h 50m (was: 2h 40m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 2h 50m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. --
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=208458=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-208458 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 06/Mar/19 06:31 Start Date: 06/Mar/19 06:31 Worklog Time Spent: 10m Work Description: VaclavPlajt commented on pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r262800586 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializable.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.base.Throwables; +import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; + +/** + * A holder object that lets you serialize an element with a Coder with minimal wasted space. + * Supports both Kryo and Java serialization. + * + * There are two different representations: a deserialized representation and a serialized + * representation. + * + * The deserialized representation stores a Coder and the value. To serialize the value, we write + * a length-prefixed encoding of value, but do NOT write the Coder used. + * + * The serialized representation just reads a byte array - the value is not deserialized fully. + * In order to get at the deserialized value, the caller must pass the Coder used to create this + * instance via getOrDecode(Coder). This reverts the representation back to the deserialized + * representation. + * + * @param element type + */ +public class ValueAndCoderKryoSerializable implements KryoSerializable, Externalizable { + private T value; + // Re-use a field to save space in-memory. This is either a byte[] or a Coder, depending on + // which representation we are in. + private Object coderOrBytes; + + ValueAndCoderKryoSerializable(T value, Coder currentCoder) { +this.value = value; +this.coderOrBytes = currentCoder; + } + + @SuppressWarnings("unused") // for serialization + public ValueAndCoderKryoSerializable() {} + + public T getOrDecode(Coder coder) throws IOException { +if (!(coderOrBytes instanceof Coder)) { + value = + coder.decode(new ByteArrayInputStream((byte[]) this.coderOrBytes), Coder.Context.OUTER); Review comment: Is there a specific reason to call deprecated method ? I would prefer to avoid that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 208458) Time Spent: 3h (was: 2h 50m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 3h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=196281=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196281 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 08/Feb/19 16:46 Start Date: 08/Feb/19 16:46 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-461866882 Just pinging @VaclavPlajt the author of a the other PR in case he can have something extra to add. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 196281) Time Spent: 2h 40m (was: 2.5h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 2h 40m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=196278=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196278 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 08/Feb/19 16:45 Start Date: 08/Feb/19 16:45 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r255132971 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java ## @@ -383,6 +385,6 @@ public static void rejectStateAndTimers(DoFn doFn) { * @return true if the level is memory only */ public static boolean avoidRddSerialization(StorageLevel level) { -return level.equals(StorageLevel.MEMORY_ONLY()) || level.equals(StorageLevel.MEMORY_ONLY_2()); Review comment: Ok, good to be sure about this. For [reference](https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). Maybe worth to add a comment with that link. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 196278) Time Spent: 2h 20m (was: 2h 10m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 2h 20m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=196280=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196280 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 08/Feb/19 16:45 Start Date: 08/Feb/19 16:45 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r255133911 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java ## @@ -346,13 +345,16 @@ public static void rejectStateAndTimers(DoFn doFn) { * @param coderMap - mapping between TupleTag and a coder * @return a pair function to convert value to bytes via coder */ - public static PairFunction, WindowedValue>, TupleTag, byte[]> + public static PairFunction< + Tuple2, WindowedValue>, + TupleTag, + ValueAndCoderKryoSerializable>> getTupleTagEncodeFunction(final Map, Coder>> coderMap) { return tuple2 -> { TupleTag tupleTag = tuple2._1; WindowedValue windowedValue = tuple2._2; return new Tuple2<>( - tupleTag, CoderHelpers.toByteArray(windowedValue, coderMap.get(tupleTag))); + tupleTag, new ValueAndCoderKryoSerializable<>(windowedValue, coderMap.get(tupleTag))); Review comment: As stated previously we avoid to use Kryo serialization by default. See comment below about separating class functionality (lazily serializing) from Serialization. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 196280) Time Spent: 2.5h (was: 2h 20m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 2.5h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=196279=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196279 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 08/Feb/19 16:45 Start Date: 08/Feb/19 16:45 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r255134906 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializable.java ## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.VarInt; + +/** + * A holder object that lets you serialize an element with a Coder with minimal wasted space. + * Supports both Kryo and Java serialization. + * + * There are two different representations: a deserialized representation and a serialized + * representation. + * + * The deserialized representation stores a Coder and the value. To serialize the value, we write + * a length-prefixed encoding of value, but do NOT write the Coder used. + * + * The serialized representation just reads a byte array - the value is not deserialized fully. + * In order to get at the deserialized value, the caller must pass the Coder used to create this + * instance via get(Coder). This reverts the representation back to the deserialized representation. + * + * @param element type + */ +public class ValueAndCoderKryoSerializable implements KryoSerializable, Externalizable { Review comment: I see. Can we just break the kryo serialization from the concrete functionality, akin to what [SerializableAccumulator](https://github.com/apache/beam/blob/f1bb53acaced2c4d1f9837920ce0d0269e5da0ef/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java#L192) and [KryoAccumulatorSerializer](https://github.com/apache/beam/blob/f1bb53acaced2c4d1f9837920ce0d0269e5da0ef/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java#L278) do, for more details PTAL at this #7398. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 196279) Time Spent: 2.5h (was: 2h 20m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 2.5h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=196277=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-196277 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 08/Feb/19 16:45 Start Date: 08/Feb/19 16:45 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r255134741 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializable.java ## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.base.Throwables; Review comment: Use vendored imports `org.apache.beam.vendor.guava.v20_0.com.google.common...` (this is a relatively newish thing on Beam). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 196277) Time Spent: 2h 20m (was: 2h 10m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Labels: triaged > Time Spent: 2h 20m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=195279=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-195279 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 06/Feb/19 18:20 Start Date: 06/Feb/19 18:20 Worklog Time Spent: 10m Work Description: mikekap commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-461130415 @iemejia ping This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 195279) Time Spent: 2h 10m (was: 2h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Time Spent: 2h 10m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=185809=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185809 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 16/Jan/19 14:27 Start Date: 16/Jan/19 14:27 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-454795050 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185809) Time Spent: 2h (was: 1h 50m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Time Spent: 2h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=185798=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-185798 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 16/Jan/19 14:18 Start Date: 16/Jan/19 14:18 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-454795050 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 185798) Time Spent: 1h 50m (was: 1h 40m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Time Spent: 1h 50m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=182370=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-182370 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 08/Jan/19 12:56 Start Date: 08/Jan/19 12:56 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-452288658 Hi @mikekap It seems this needs a rebase after I merged #6998 can you please do it and I will take a look immediately afterwards (sorry for being so slow with this, quite busy time + christmas holidays). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 182370) Time Spent: 1h 40m (was: 1.5h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Time Spent: 1h 40m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=174171=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-174171 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 11/Dec/18 17:36 Start Date: 11/Dec/18 17:36 Worklog Time Spent: 10m Work Description: mikekap commented on a change in pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r240717309 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializable.java ## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.VarInt; + +/** + * A holder object that lets you serialize an element with a Coder with minimal wasted space. + * Supports both Kryo and Java serialization. + * + * There are two different representations: a deserialized representation and a serialized + * representation. + * + * The deserialized representation stores a Coder and the value. To serialize the value, we write + * a length-prefixed encoding of value, but do NOT write the Coder used. + * + * The serialized representation just reads a byte array - the value is not deserialized fully. + * In order to get at the deserialized value, the caller must pass the Coder used to create this + * instance via get(Coder). This reverts the representation back to the deserialized representation. + * + * @param element type + */ +public class ValueAndCoderKryoSerializable implements KryoSerializable, Externalizable { Review comment: Unfortunately I need this to be public because the kryo registrator is in another package. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 174171) Time Spent: 1.5h (was: 1h 20m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Time Spent: 1.5h > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=173721=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173721 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 10/Dec/18 18:01 Start Date: 10/Dec/18 18:01 Worklog Time Spent: 10m Work Description: mikekap commented on a change in pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r240317683 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java ## @@ -383,6 +385,6 @@ public static void rejectStateAndTimers(DoFn doFn) { * @return true if the level is memory only */ public static boolean avoidRddSerialization(StorageLevel level) { -return level.equals(StorageLevel.MEMORY_ONLY()) || level.equals(StorageLevel.MEMORY_ONLY_2()); Review comment: This is a drive-by fix for using the MEMORY_ONLY_2 storage level. The code previously assumed that no serialization was necessary, which isn't strictly true since the _2 means "replicate to other nodes" - i.e. serialize over network. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 173721) Time Spent: 1h 10m (was: 1h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Time Spent: 1h 10m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=173722=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173722 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 10/Dec/18 18:04 Start Date: 10/Dec/18 18:04 Worklog Time Spent: 10m Work Description: mikekap commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-445912555 @iemejia I’m a bit confused about the top level comment. This PR works in both Kryo and Java serialization modes - and doesn’t do anything to change the default. Is there something else you wanted me to check? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 173722) Time Spent: 1h 20m (was: 1h 10m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Time Spent: 1h 20m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=173685=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173685 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 10/Dec/18 16:42 Start Date: 10/Dec/18 16:42 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r233476691 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializable.java ## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.VarInt; + +/** + * A holder object that lets you serialize an element with a Coder with minimal wasted space. + * Supports both Kryo and Java serialization. + * + * There are two different representations: a deserialized representation and a serialized + * representation. + * + * The deserialized representation stores a Coder and the value. To serialize the value, we write + * a length-prefixed encoding of value, but do NOT write the Coder used. + * + * The serialized representation just reads a byte array - the value is not deserialized fully. + * In order to get at the deserialized value, the caller must pass the Coder used to create this + * instance via get(Coder). This reverts the representation back to the deserialized representation. + * + * @param element type + */ +public class ValueAndCoderKryoSerializable implements KryoSerializable, Externalizable { + private T value; + // Re-use a field to save space in-memory. This is either a byte[] or a Coder, depending on + // which representation we are in. + private Object coderOrBytes; + + ValueAndCoderKryoSerializable(T value, Coder currentCoder) { +this.value = value; +this.coderOrBytes = currentCoder; + } + + @SuppressWarnings("unused") // for serialization + public ValueAndCoderKryoSerializable() {} + + public T get(Coder coder) throws IOException { Review comment: Can you please rename this to `getOrDecodeValue` or `getOrDecode` (this may sound picky but the intention of get with an argument is a bit lost). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 173685) Time Spent: 50m (was: 40m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Time Spent: 50m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=173687=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173687 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 10/Dec/18 16:42 Start Date: 10/Dec/18 16:42 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r233476897 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializable.java ## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.VarInt; + +/** + * A holder object that lets you serialize an element with a Coder with minimal wasted space. + * Supports both Kryo and Java serialization. + * + * There are two different representations: a deserialized representation and a serialized + * representation. + * + * The deserialized representation stores a Coder and the value. To serialize the value, we write + * a length-prefixed encoding of value, but do NOT write the Coder used. + * + * The serialized representation just reads a byte array - the value is not deserialized fully. + * In order to get at the deserialized value, the caller must pass the Coder used to create this + * instance via get(Coder). This reverts the representation back to the deserialized representation. + * + * @param element type + */ +public class ValueAndCoderKryoSerializable implements KryoSerializable, Externalizable { + private T value; + // Re-use a field to save space in-memory. This is either a byte[] or a Coder, depending on + // which representation we are in. + private Object coderOrBytes; + + ValueAndCoderKryoSerializable(T value, Coder currentCoder) { +this.value = value; +this.coderOrBytes = currentCoder; + } + + @SuppressWarnings("unused") // for serialization + public ValueAndCoderKryoSerializable() {} + + public T get(Coder coder) throws IOException { +if (!(coderOrBytes instanceof Coder)) { + value = + coder.decode(new ByteArrayInputStream((byte[]) this.coderOrBytes), Coder.Context.OUTER); + this.coderOrBytes = coder; +} + +return value; + } + + private void writeCommon(OutputStream out) throws IOException { +if (!(coderOrBytes instanceof Coder)) { + byte[] bytes = (byte[]) coderOrBytes; + VarInt.encode(bytes.length, out); + out.write(bytes); +} else { + int bufferSize = 1024; + // TODO: use isRegisterByteSizeObserverCheap Review comment: TODO ? can this be done in this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 173687) Time Spent: 1h (was: 50m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=173684=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173684 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 10/Dec/18 16:42 Start Date: 10/Dec/18 16:42 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r234687933 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java ## @@ -383,6 +385,6 @@ public static void rejectStateAndTimers(DoFn doFn) { * @return true if the level is memory only */ public static boolean avoidRddSerialization(StorageLevel level) { -return level.equals(StorageLevel.MEMORY_ONLY()) || level.equals(StorageLevel.MEMORY_ONLY_2()); Review comment: Why `MEMORY_ONLY_2` is not needed anymore? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 173684) Time Spent: 50m (was: 40m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Time Spent: 50m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=173686=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173686 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 10/Dec/18 16:42 Start Date: 10/Dec/18 16:42 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r233477074 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializable.java ## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.VarInt; + +/** + * A holder object that lets you serialize an element with a Coder with minimal wasted space. + * Supports both Kryo and Java serialization. + * + * There are two different representations: a deserialized representation and a serialized + * representation. + * + * The deserialized representation stores a Coder and the value. To serialize the value, we write + * a length-prefixed encoding of value, but do NOT write the Coder used. + * + * The serialized representation just reads a byte array - the value is not deserialized fully. + * In order to get at the deserialized value, the caller must pass the Coder used to create this + * instance via get(Coder). This reverts the representation back to the deserialized representation. + * + * @param element type + */ +public class ValueAndCoderKryoSerializable implements KryoSerializable, Externalizable { Review comment: package private This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 173686) Time Spent: 50m (was: 40m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Time Spent: 50m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=165836=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165836 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 14/Nov/18 08:57 Start Date: 14/Nov/18 08:57 Worklog Time Spent: 10m Work Description: iemejia commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-438586482 Run Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165836) Time Spent: 40m (was: 0.5h) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Mike Kaplinskiy >Priority: Minor > Time Spent: 40m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=165257=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165257 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 13/Nov/18 01:14 Start Date: 13/Nov/18 01:14 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#issuecomment-438091517 R: @iemejia can you please review or forward to somehow who is familiar with this code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 165257) Time Spent: 20m (was: 10m) > Make the spark runner not serialize data unless spark is spilling to disk > - > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Mike Kaplinskiy >Assignee: Amit Sela >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5775) Make the spark runner not serialize data unless spark is spilling to disk
[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=155233=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155233 ] ASF GitHub Bot logged work on BEAM-5775: Author: ASF GitHub Bot Created on: 17/Oct/18 00:51 Start Date: 17/Oct/18 00:51 Worklog Time Spent: 10m Work Description: mikekap opened a new pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714 Spark's `StorageLevel` is the preferred mechanism to decide what is serialized when and where. With this change, Beam respects Spark's wish to keep data deserialized in memory, even if the storage level *may* swap to disk (e.g. MEMORY_AND_DISK). This PR also drive-by fixes using the `MEMORY_ONLY_2` storage level. The code previously assumed that no serialization was necessary, which isn't strictly true since the `_2` means "replicate to other nodes" - i.e. serialize over network. Follow this checklist to help us incorporate your contribution quickly and easily: - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org