unsubscribe
How to get a single available message from kafka (case where OffsetRange.fromOffset == OffsetRange.untilOffset)
Hi, I am using KafkaUtils.createRDD to retrieve data from Kafka for batch processing and when Invoking KafkaUtils.createRDD with an OffsetRange where OffsetRange.fromOffset == OffsetRange.untilOffset for a particular partition, i get an empy RDD. Documentation is clear that until is exclusive and from inclusive, but if i use OffsetRange.untilOffset + 1 i get an invalid OffsetRange exception during the check. Sinve this should also apply in general (if untilOffset is exculsive you cannot fetch it ), does it mean that untilOffset is also non-existent in Kafka (and thus always exlcusive) or i am missing something? regards p.s. by manually using the kafka protocol to query the offsets i see that kafka.api.OffsetRequest.EarliestTime() == kafka.api.OffsetRequest.LatestTime() and set to a poisitive value
Re: DataFrame.write().partitionBy(some_column).parquet(path) produces OutOfMemory with very few items
Ηι Lian, Thank you for the tip. Indeed, there were a lot of distinct values in my result set (approximately 3000). As you suggested i decided to partition the data firstly on a column with much smaller cardinality. Thanks n On Thu, Jul 16, 2015 at 2:09 PM, Cheng Lian lian.cs@gmail.com wrote: Hi Nikos, How many columns and distinct values of some_column are there in the DataFrame? Parquet writer is known to be very memory consuming for wide tables. And lots of distinct partition column values result in many concurrent Parquet writers. One possible workaround is to first repartition the data by partition columns first. Cheng On 7/15/15 7:05 PM, Nikos Viorres wrote: Hi, I am trying to test partitioning for DataFrames with parquet usage so i attempted to do df.write().partitionBy(some_column).parquet(path) on a small dataset of 20.000 records which when saved as parquet locally with gzip take 4mb of disk space. However, on my dev machine with -Dspark.master=local[4] -Dspark.executor.memory=2g -Xmx10g this always fails with an OutOfMemoryError. Does anyone have any ideas? stack trace: [Stage 2: (0 + 4) / 8]2015-07-15 13:57:21,021 ERROR Logging$class Exception in task 3.0 in stage 2.0 (TID 8) java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:111) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436) at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-07-15 13:57:21,051 ERROR Logging$class Uncaught exception in thread Thread[Executor task launch worker-2,5,main] java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java
DataFrame.write().partitionBy(some_column).parquet(path) produces OutOfMemory with very few items
Hi, I am trying to test partitioning for DataFrames with parquet usage so i attempted to do df.write().partitionBy(some_column).parquet(path) on a small dataset of 20.000 records which when saved as parquet locally with gzip take 4mb of disk space. However, on my dev machine with -Dspark.master=local[4] -Dspark.executor.memory=2g -Xmx10g this always fails with an OutOfMemoryError. Does anyone have any ideas? stack trace: [Stage 2: (0 + 4) / 8]2015-07-15 13:57:21,021 ERROR Logging$class Exception in task 3.0 in stage 2.0 (TID 8) java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:111) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436) at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-07-15 13:57:21,051 ERROR Logging$class Uncaught exception in thread Thread[Executor task launch worker-2,5,main] java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:111) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) at
Re: updateStateByKey performance API
Hi Akhil, Yes, that's what we are planning on doing at the end of the data. At the moment I am doing performance testing before the job hits production and testing on 4 cores to get baseline figures and deduced that in order to grow to 10 - 15 million keys we ll need at batch interval of ~20 secs if we don't want to allocate more than 8 cores on this job. The thing is that since we have a big silent window on the user interactions where the stream will have very few data we would like to be able to use these cores for batch processing during that window but we can't the way it currently works. best regards n On Wed, Mar 18, 2015 at 12:40 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can always throw more machines at this and see if the performance is increasing. Since you haven't mentioned anything regarding your # cores etc. Thanks Best Regards On Wed, Mar 18, 2015 at 11:42 AM, nvrs nvior...@gmail.com wrote: Hi all, We are having a few issues with the performance of updateStateByKey operation in Spark Streaming (1.2.1 at the moment) and any advice would be greatly appreciated. Specifically, on each tick of the system (which is set at 10 secs) we need to update a state tuple where the key is the user_id and value an object with some state about the user. The problem is that using Kryo serialization for 5M users, this gets really slow to the point that we have to increase the period to more than 10 seconds so as not to fall behind. The input for the streaming job is a Kafka stream which is consists of key value pairs of user_ids with some sort of action codes, we join this to our checkpointed state key and update the state. I understand that the reason for iterating over the whole state set is for evicting items or updating state for everyone for time-depended computations but this does not apply on our situation and it hurts performance really bad. Is there a possibility of implementing in the future and extra call in the API for updating only a specific subset of keys? p.s. i will try asap to setting the dstream as non-serialized but then i am worried about GC and checkpointing performance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/updateStateByKey-performance-API-tp22113.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
updateStateByKey performance / API
Hi all, We are having a few issues with the performance of updateStateByKey operation in Spark Streaming (1.2.1 at the moment) and any advice would be greatly appreciated. Specifically, on each tick of the system (which is set at 10 secs) we need to update a state tuple where the key is the user_id and value an object with some state about the user. The problem is that using Kryo serialization for 5M users, this gets really slow to the point that we have to increase the period to more than 10 seconds so as not to fall behind. The input for the streaming job is a Kafka stream which is consists of key value pairs of user_ids with some sort of action codes, we join this to our checkpointed state key and update the state. I understand that the reason for iterating over the whole state set is for evicting items or updating state for everyone for time-depended computations but this does not apply on our situation and it hurts performance really bad. Is there a possibility of implementing in the future and extra call in the API for updating only a specific subset of keys? p.s. i will try asap to setting the dstream as non-serialized but then i am worried about GC and checkpointing performance
updateStateByKey performance
Hi all, We are having a few issues with the performance of updateStateByKey operation in Spark Streaming (1.2.1 at the moment) and any advice would be greatly appreciated. Specifically, on each tick of the system (which is set at 10 secs) we need to update a state tuple where the key is the user_id and value an object with some state about the user. The problem is that using Kryo serialization for 5M users, this gets really slow to the point that we have to increase the period to more than 10 seconds so as not to fall behind. The input for the streaming job is a Kafka stream which is consists of key value pairs of user_ids with some sort of action codes, we join this to our checkpointed state key and update the state. I understand that the reason for iterating over the whole state set is for evicting items or updating state for everyone for time-depended computations but this does not apply on our situation and it hurts performance really bad. Is there a possibility of implementing in the future and extra call in the API for updating only a specific subset of keys? p.s. i will try asap to setting the dstream as non-serialized but then i am worried about GC and checkpointing performance.