The OOM happen in driver, you may also need more memory for driver.
On Fri, Aug 19, 2016 at 2:33 PM, Davies Liu <dav...@databricks.com> wrote:
> You are using lots of tiny executors (128 executor with only 2G
> memory), could you try with bigger executor (for example 16G x 16)?
>
> On Fri, Aug 19, 2016 at 8:19 AM, Ben Teeuwen <bteeu...@gmail.com> wrote:
>>
>> So I wrote some code to reproduce the problem.
>>
>> I assume here that a pipeline should be able to transform a categorical
>> feature with a few million levels.
>> So I create a dataframe with the categorical feature (‘id’), apply a
>> StringIndexer and OneHotEncoder transformer, and run a loop where I increase
>> the amount of levels.
>> It breaks at 1.276.000 levels.
>>
>> Shall I report this as a ticket in JIRA?
>>
>> ____________
>>
>>
>> from pyspark.sql.functions import rand
>> from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler
>> from pyspark.ml import Pipeline
>>
>> start_id = 100000
>> n = 5000000
>> step = (n - start_id) / 25
>>
>> for i in xrange(start_id,start_id + n,step):
>> print "#########\n {}".format(i)
>> dfr = (sqlContext
>> .range(start_id, start_id + i)
>> .withColumn(‘label', rand(seed=10))
>> .withColumn('feat2', rand(seed=101))
>> # .withColumn('normal', randn(seed=27))
>> ).repartition(32).cache()
>> # dfr.select("id", rand(seed=10).alias("uniform"),
>> randn(seed=27).alias("normal")).show()
>> dfr.show(1)
>> print "This dataframe has {0} rows (and therefore {0} levels will be one
>> hot encoded)".format(dfr.count())
>>
>> categorical_feature = ['id']
>> stages = []
>>
>> for c in categorical_feature:
>> stages.append(StringIndexer(inputCol=c,
>> outputCol="{}Index".format(c)))
>> stages.append(OneHotEncoder(dropLast= False, inputCol =
>> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>>
>> columns = ["{}OHE".format(x) for x in categorical_feature]
>> columns.append('feat2')
>>
>> assembler = VectorAssembler(
>> inputCols=columns,
>> outputCol="features")
>> stages.append(assembler)
>>
>> df2 = dfr
>>
>> pipeline = Pipeline(stages=stages)
>> pipeline_fitted = pipeline.fit(df2)
>> df3 = pipeline_fitted.transform(df2)
>> df3.show(1)
>> dfr.unpersist()
>>
>>
>> ____________
>>
>> Output:
>>
>>
>> #########
>> 100000
>> +------+---------------------------+-------------------+
>> | id|label | feat2|
>> +------+---------------------------+-------------------+
>> |183601| 0.38693226548356197|0.04485291680169634|
>> +------+---------------------------+-------------------+
>> only showing top 1 row
>>
>> This dataframe has 100000 rows (and therefore 100000 levels will be one hot
>> encoded)
>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>> | id|label | feat2|idIndex|
>> idOHE| features|
>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>> |183601|
>> 0.38693226548356197|0.04485291680169634|83240.0|(100000,[83240],[...|(100001,[83240,10...|
>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>> only showing top 1 row
>>
>> #########
>> 296000
>> +------+---------------------------+-------------------+
>> | id|label | feat2|
>> +------+---------------------------+-------------------+
>> |137008| 0.2996020619810592|0.38693226548356197|
>> +------+---------------------------+-------------------+
>> only showing top 1 row
>>
>> This dataframe has 296000 rows (and therefore 296000 levels will be one hot
>> encoded)
>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>> | id|label | feat2|idIndex|
>> idOHE| features|
>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>> |137008|
>> 0.2996020619810592|0.38693226548356197|35347.0|(296000,[35347],[...|(296001,[35347,29...|
>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>> only showing top 1 row
>>
>> #########
>> 492000
>> +------+---------------------------+-------------------+
>> | id|label | feat2|
>> +------+---------------------------+-------------------+
>> |534351| 0.9450641392552516|0.23472935141246665|
>> +------+---------------------------+-------------------+
>> only showing top 1 row
>>
>> This dataframe has 492000 rows (and therefore 492000 levels will be one hot
>> encoded)
>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>> | id|label | feat2|idIndex|
>> idOHE| features|
>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>> |534351| 0.9450641392552516|0.23472935141246665|
>> 3656.0|(492000,[3656],[1...|(492001,[3656,492...|
>> +------+---------------------------+-------------------+-------+--------------------+--------------------+
>> only showing top 1 row
>>
>> #########
>> 688000
>> +------+---------------------------+------------------+
>> | id|label | feat2|
>> +------+---------------------------+------------------+
>> |573008| 0.3059347083549171|0.4846147657830415|
>> +------+---------------------------+------------------+
>> only showing top 1 row
>>
>> This dataframe has 688000 rows (and therefore 688000 levels will be one hot
>> encoded)
>> +------+---------------------------+------------------+--------+--------------------+--------------------+
>> | id|label | feat2| idIndex|
>> idOHE| features|
>> +------+---------------------------+------------------+--------+--------------------+--------------------+
>> |573008|
>> 0.3059347083549171|0.4846147657830415|475855.0|(688000,[475855],...|(688001,[475855,6...|
>> +------+---------------------------+------------------+--------+--------------------+--------------------+
>> only showing top 1 row
>>
>> #########
>> 884000
>> +------+---------------------------+------------------+
>> | id|label | feat2|
>> +------+---------------------------+------------------+
>> |970195| 0.34345290476989165|0.9843176058907069|
>> +------+---------------------------+------------------+
>> only showing top 1 row
>>
>> This dataframe has 884000 rows (and therefore 884000 levels will be one hot
>> encoded)
>> +------+---------------------------+------------------+--------+--------------------+--------------------+
>> | id|label | feat2| idIndex|
>> idOHE| features|
>> +------+---------------------------+------------------+--------+--------------------+--------------------+
>> |970195|
>> 0.34345290476989165|0.9843176058907069|333915.0|(884000,[333915],...|(884001,[333915,8...|
>> +------+---------------------------+------------------+--------+--------------------+--------------------+
>> only showing top 1 row
>>
>> #########
>> 1080000
>> +------+---------------------------+-----------------+
>> | id|label | feat2|
>> +------+---------------------------+-----------------+
>> |403758| 0.6333344187975314|0.774327685753309|
>> +------+---------------------------+-----------------+
>> only showing top 1 row
>>
>> This dataframe has 1080000 rows (and therefore 1080000 levels will be one
>> hot encoded)
>> +------+---------------------------+-----------------+--------+--------------------+--------------------+
>> | id|label | feat2| idIndex|
>> idOHE| features|
>> +------+---------------------------+-----------------+--------+--------------------+--------------------+
>> |403758|
>> 0.6333344187975314|0.774327685753309|287898.0|(1080000,[287898]...|(1080001,[287898,...|
>> +------+---------------------------+-----------------+--------+--------------------+--------------------+
>> only showing top 1 row
>>
>> #########
>> 1276000
>> +------+---------------------------+------------------+
>> | id|label | feat2|
>> +------+---------------------------+------------------+
>> |508726| 0.2513814327408137|0.8480577183702391|
>> +------+---------------------------+------------------+
>> only showing top 1 row
>>
>> This dataframe has 1276000 rows (and therefore 1276000 levels will be one
>> hot encoded)
>>
>> ---------------------------------------------------------------------------
>> Py4JJavaError Traceback (most recent call last)
>> <ipython-input-2-f5c9fe263872> in <module>()
>> 38 pipeline = Pipeline(stages=stages)
>> 39 pipeline_fitted = pipeline.fit(df2)
>> ---> 40 df3 = pipeline_fitted.transform(df2)
>> 41 df3.show(1)
>> 42 dfr.unpersist()
>>
>> /opt/spark/2.0.0/python/pyspark/ml/base.py in transform(self, dataset,
>> params)
>> 103 return self.copy(params)._transform(dataset)
>> 104 else:
>> --> 105 return self._transform(dataset)
>> 106 else:
>> 107 raise ValueError("Params must be a param map but got
>> %s." % type(params))
>>
>> /opt/spark/2.0.0/python/pyspark/ml/pipeline.py in _transform(self, dataset)
>> 196 def _transform(self, dataset):
>> 197 for t in self.stages:
>> --> 198 dataset = t.transform(dataset)
>> 199 return dataset
>> 200
>>
>> /opt/spark/2.0.0/python/pyspark/ml/base.py in transform(self, dataset,
>> params)
>> 103 return self.copy(params)._transform(dataset)
>> 104 else:
>> --> 105 return self._transform(dataset)
>> 106 else:
>> 107 raise ValueError("Params must be a param map but got
>> %s." % type(params))
>>
>> /opt/spark/2.0.0/python/pyspark/ml/wrapper.py in _transform(self, dataset)
>> 227 def _transform(self, dataset):
>> 228 self._transfer_params_to_java()
>> --> 229 return DataFrame(self._java_obj.transform(dataset._jdf),
>> dataset.sql_ctx)
>> 230
>> 231
>>
>> /opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in
>> __call__(self, *args)
>> 931 answer = self.gateway_client.send_command(command)
>> 932 return_value = get_return_value(
>> --> 933 answer, self.gateway_client, self.target_id, self.name)
>> 934
>> 935 for temp_arg in temp_args:
>>
>> /opt/spark/2.0.0/python/pyspark/sql/utils.py in deco(*a, **kw)
>> 61 def deco(*a, **kw):
>> 62 try:
>> ---> 63 return f(*a, **kw)
>> 64 except py4j.protocol.Py4JJavaError as e:
>> 65 s = e.java_exception.toString()
>>
>> /opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in
>> get_return_value(answer, gateway_client, target_id, name)
>> 310 raise Py4JJavaError(
>> 311 "An error occurred while calling {0}{1}{2}.\n".
>> --> 312 format(target_id, ".", name), value)
>> 313 else:
>> 314 raise Py4JError(
>>
>> Py4JJavaError: An error occurred while calling o408.transform.
>> : java.lang.OutOfMemoryError: GC overhead limit exceeded
>> at scala.collection.immutable.Stream$.from(Stream.scala:1262)
>> at scala.collection.immutable.Stream$$anonfun$from$1.apply(Stream.scala:1262)
>> at scala.collection.immutable.Stream$$anonfun$from$1.apply(Stream.scala:1262)
>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
>> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
>> at
>> scala.collection.LinearSeqOptimized$class.loop$1(LinearSeqOptimized.scala:274)
>> at
>> scala.collection.LinearSeqOptimized$class.lengthCompare(LinearSeqOptimized.scala:277)
>> at scala.collection.immutable.Stream.lengthCompare(Stream.scala:202)
>> at scala.collection.SeqViewLike$Zipped$class.length(SeqViewLike.scala:133)
>> at scala.collection.SeqViewLike$$anon$9.length(SeqViewLike.scala:203)
>> at scala.collection.SeqViewLike$Mapped$class.length(SeqViewLike.scala:66)
>> at scala.collection.SeqViewLike$$anon$3.length(SeqViewLike.scala:197)
>> at scala.collection.SeqLike$class.size(SeqLike.scala:106)
>> at
>> scala.collection.SeqViewLike$AbstractTransformed.size(SeqViewLike.scala:37)
>> at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:285)
>> at
>> scala.collection.SeqViewLike$AbstractTransformed.toArray(SeqViewLike.scala:37)
>> at
>> org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
>> at
>> org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
>> at scala.Option.map(Option.scala:146)
>> at
>> org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:70)
>> at
>> org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:65)
>> at
>> org.apache.spark.ml.attribute.AttributeGroup$.fromMetadata(AttributeGroup.scala:234)
>> at
>> org.apache.spark.ml.attribute.AttributeGroup$.fromStructField(AttributeGroup.scala:246)
>> at
>> org.apache.spark.ml.feature.OneHotEncoder.transform(OneHotEncoder.scala:139)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>> at py4j.Gateway.invoke(Gateway.java:280)
>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
>>
>>
>>
>>
>> Spark Properties
>> NameValue
>> spark.app.namepyspark-shell
>> spark.driver.cores1
>> spark.driver.extraJavaOptions-XX:+UnlockDiagnosticVMOptions
>> -XX:+PerfDisableSharedMem
>> spark.driver.memory2g
>> spark.dynamicAllocation.enabledFALSE
>> spark.eventLog.dirhdfs:///spark/history
>> spark.eventLog.enabledTRUE
>> spark.executor.cores1
>> spark.executor.extraJavaOptions-XX:+UnlockDiagnosticVMOptions
>> -XX:+PerfDisableSharedMem
>> spark.executor.iddriver
>> spark.executor.instances128
>> spark.executor.memory2g
>> spark.history.fs.logDirectoryhdfs:///spark/history
>> spark.masteryarn-client
>> spark.memory.fraction0.7
>> spark.memory.storageFraction0.5
>> spark.rdd.compressTRUE
>> spark.scheduler.modeFIFO
>> spark.serializer.objectStreamReset100
>> spark.shuffle.service.enabledFALSE
>> spark.speculationTRUE
>> spark.submit.deployModeclient
>> spark.task.maxFailures10
>> spark.yarn.executor.memoryOverhead2048
>> spark.yarn.isPythonTRUE
>>
>>
>> On Aug 11, 2016, at 10:24 PM, Nick Pentreath <nick.pentre...@gmail.com>
>> wrote:
>>
>> Ok, interesting. Would be interested to see how it compares.
>>
>> By the way, the feature size you select for the hasher should be a power of
>> 2 (e.g. 2**24 to 2**26 may be worth trying) to ensure the feature indexes
>> are evenly distributed (see the section on HashingTF under
>> http://spark.apache.org/docs/latest/ml-features.html#tf-idf).
>>
>> On Thu, 11 Aug 2016 at 22:14 Ben Teeuwen <bteeu...@gmail.com> wrote:
>>>
>>> Thanks Nick, I played around with the hashing trick. When I set numFeatures
>>> to the amount of distinct values for the largest sparse feature, I ended up
>>> with half of them colliding. When raising the numFeatures to have less
>>> collisions, I soon ended up with the same memory problems as before. To be
>>> honest, I didn’t test the impact of having more or less collisions on the
>>> quality of the predictions, but tunnel visioned into getting it to work
>>> with the full sparsity.
>>>
>>> Before I worked in RDD land; zipWithIndex on rdd with distinct values + one
>>> entry ‘missing’ for missing values during predict, collectAsMap, broadcast
>>> the map, udf generating sparse vector, assembling the vectors manually). To
>>> move into dataframe land, I wrote:
>>>
>>> def getMappings(mode):
>>> mappings = defaultdict(dict)
>>> max_index = 0
>>> for c in cat_int[:]: # for every categorical variable
>>>
>>> logging.info("starting with {}".format(c))
>>> if mode == 'train':
>>> grouped = (df2
>>> .groupBy(c).count().orderBy('count', ascending = False) #
>>> get counts, ordered from largest to smallest
>>> .selectExpr("*", "1 as n") # prepare for window
>>> function summing up 1s before current row to create a RANK
>>> .selectExpr("*", "SUM(n) OVER (ORDER BY count DESC ROWS
>>> BETWEEN UNBOUNDED PRECEDING AND 0 PRECEDING) + {} AS
>>> index".format(max_index))
>>> .drop('n') # drop the column with static 1 values used for
>>> the cumulative sum
>>> )
>>> logging.info("Got {} rows.".format(grouped.count()))
>>> grouped.show()
>>> logging.info('getting max')
>>> max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda
>>> r: r.t).first() # update the max index so next categorical feature starts
>>> with it.
>>> logging.info("max_index has become: {}".format(max_index))
>>> logging.info('adding missing value, so we also train on this
>>> and prediction data missing it. ')
>>> schema = grouped.schema
>>> logging.info(schema)
>>> grouped = grouped.union(spark.createDataFrame([('missing', 0,
>>> max_index + 1)], schema)) # add index for missing value for values during
>>> predict that are unseen during training.
>>> max_index += 1
>>> saveto = "{}/{}".format(path, c)
>>> logging.info("Writing to: {}".format(saveto))
>>> grouped.write.parquet(saveto, mode = 'overwrite')
>>>
>>> elif mode == 'predict':
>>> loadfrom = "{}/{}".format(path, c)
>>> logging.info("Reading from: {}".format(loadfrom))
>>> grouped = spark.read.parquet(loadfrom)
>>>
>>> logging.info("Adding to dictionary")
>>> mappings[c] = grouped.rdd.map(lambda r: r.asDict()).map(lambda d:
>>> (d[c], d['index'])).collectAsMap() # build up dictionary to be broadcasted
>>> later on, used for creating sparse vectors
>>> max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r:
>>> r.t).first()
>>>
>>> logging.info("Sanity check for indexes:")
>>> for c in cat_int[:]:
>>> logging.info("{} min: {} max: {}".format(c,
>>> min(mappings[c].values()), max(mappings[c].values()))) # some logging to
>>> confirm the indexes.
>>> logging.info("Missing value = {}".format(mappings[c]['missing']))
>>> return max_index, mappings
>>>
>>> I’d love to see the StringIndexer + OneHotEncoder transformers cope with
>>> missing values during prediction; for now I’ll work with the hacked stuff
>>> above :).
>>> (.. and I should compare the performance with using the hashing trick.)
>>>
>>> Ben
>>
>>
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org