A rough estimate of the worst case memory requirement for driver is about 2 * k * runs * numFeatures * numPartitions * 8 bytes. I put 2 at the beginning because the previous centers are still in memory while receiving new center updates. -Xiangrui
On Fri, Jun 19, 2015 at 9:02 AM, Rogers Jeffrey <[email protected]> wrote: > Thanks. Setting the driver memory property worked for K=1000 . But when I > increased K to1500 I get the following error: > > 15/06/19 09:38:44 INFO ContextCleaner: Cleaned accumulator 7 > > 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on > 172.31.3.51:45157 in memory (size: 1568.0 B, free: 10.4 GB) > > 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on > 172.31.9.50:59356 in memory (size: 1568.0 B, free: 73.6 GB) > > 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on > 172.31.9.50:60934 in memory (size: 1568.0 B, free: 73.6 GB) > > 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on > 172.31.15.51:37825 in memory (size: 1568.0 B, free: 73.6 GB) > > 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on > 172.31.15.51:60610 in memory (size: 1568.0 B, free: 73.6 GB) > > 15/06/19 09:38:44 INFO ContextCleaner: Cleaned shuffle 5 > > Exception in thread "Thread-2" java.lang.OutOfMemoryError: Requested array > size exceeds VM limit > > at java.util.Arrays.copyOf(Arrays.java:2367) > > at > java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) > > at > java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) > > at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:587) > > at java.lang.StringBuilder.append(StringBuilder.java:214) > > at py4j.Protocol.getOutputCommand(Protocol.java:305) > > at py4j.commands.CallCommand.execute(CallCommand.java:82) > > at py4j.GatewayConnection.run(GatewayConnection.java:207) > > at java.lang.Thread.run(Thread.java:745) > > Exception in thread "Thread-300" java.lang.OutOfMemoryError: Requested array > size exceeds VM limit > > at java.util.Arrays.copyOf(Arrays.java:2367) > > at > java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) > > at > java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) > > at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:587) > > at java.lang.StringBuilder.append(StringBuilder.java:214) > > at py4j.Protocol.getOutputCommand(Protocol.java:305) > > at py4j.commands.CallCommand.execute(CallCommand.java:82) > > at py4j.GatewayConnection.run(GatewayConnection.java:207) > > > Is there any method/guideline through which I can understand the memory > requirement before hand and make appropriate configurations? > > Regards, > Rogers Jeffrey L > > On Thu, Jun 18, 2015 at 8:14 PM, Rogers Jeffrey <[email protected]> > wrote: >> >> I am submitting the application from a python notebook. I am launching >> pyspark as follows: >> >> SPARK_PUBLIC_DNS=ec2-54-165-202-17.compute-1.amazonaws.com >> SPARK_WORKER_CORES=8 SPARK_WORKER_MEMORY=15g SPARK_MEM=30g OUR_JAVA_MEM=30g >> SPARK_DAEMON_JAVA_OPTS="-XX:MaxPermSize=30g -Xms30g -Xmx30g" IPYTHON=1 >> PYSPARK_PYTHON=/usr/bin/python SPARK_PRINT_LAUNCH_COMMAND=1 >> ./spark/bin/pyspark --master >> spark://54.165.202.17.compute-1.amazonaws.com:7077 --deploy-mode client >> >> I guess I should be adding another extra argument --conf >> "spark.driver.memory=15g" . Is that correct? >> >> Regards, >> Rogers Jeffrey L >> >> On Thu, Jun 18, 2015 at 7:50 PM, Xiangrui Meng <[email protected]> wrote: >>> >>> With 80,000 features and 1000 clusters, you need 80,000,000 doubles to >>> store the cluster centers. That is ~600MB. If there are 10 partitions, >>> you might need 6GB on the driver to collect updates from workers. I >>> guess the driver died. Did you specify driver memory with >>> spark-submit? -Xiangrui >>> >>> On Thu, Jun 18, 2015 at 12:22 PM, Rogers Jeffrey >>> <[email protected]> wrote: >>> > Hi All, >>> > >>> > I am trying to run KMeans clustering on a large data set with 12,000 >>> > points >>> > and 80,000 dimensions. I have a spark cluster in Ec2 stand alone mode >>> > with >>> > 8 workers running on 2 slaves with 160 GB Ram and 40 VCPU. >>> > >>> > My Code is as Follows: >>> > >>> > def convert_into_sparse_vector(A): >>> > non_nan_indices=np.nonzero(~np.isnan(A) ) >>> > non_nan_values=A[non_nan_indices] >>> > dictionary=dict(zip(non_nan_indices[0],non_nan_values)) >>> > return Vectors.sparse (len(A),dictionary) >>> > >>> > X=[convert_into_sparse_vector(A) for A in complete_dataframe.values ] >>> > sc=SparkContext(appName="parallel_kmeans") >>> > data=sc.parallelize(X,10) >>> > model = KMeans.train(data, 1000, initializationMode="k-means||") >>> > >>> > where complete_dataframe is a pandas data frame that has my data. >>> > >>> > I get the error: Py4JNetworkError: An error occurred while trying to >>> > connect >>> > to the Java server. >>> > >>> > The error trace is as follows: >>> > >>> >> ---------------------------------------- Exception happened during >>> >> processing of request from ('127.0.0.1', 41360) Traceback (most recent >>> >> call last): File "/usr/lib64/python2.6/SocketServer.py", line 283, >>> >> in _handle_request_noblock >>> >> self.process_request(request, client_address) File >>> >> "/usr/lib64/python2.6/SocketServer.py", line 309, in process_request >>> >> self.finish_request(request, client_address) File >>> >> "/usr/lib64/python2.6/SocketServer.py", line 322, in finish_request >>> >> self.RequestHandlerClass(request, client_address, self) File >>> >> "/usr/lib64/python2.6/SocketServer.py", line 617, in __init__ >>> >> self.handle() File "/root/spark/python/pyspark/accumulators.py", >>> >> line 235, in handle >>> >> num_updates = read_int(self.rfile) File >>> >> "/root/spark/python/pyspark/serializers.py", line 544, in read_int >>> >> raise EOFError EOFError >>> >> ---------------------------------------- >>> >> >>> >> >>> >> --------------------------------------------------------------------------- >>> >> Py4JNetworkError Traceback (most recent call >>> >> last) <ipython-input-13-3dd00c2c5e93> in <module>() >>> >> ----> 1 model = KMeans.train(data, 1000, >>> >> initializationMode="k-means||") >>> >> >>> >> /root/spark/python/pyspark/mllib/clustering.pyc in train(cls, rdd, k, >>> >> maxIterations, runs, initializationMode, seed, initializationSteps, >>> >> epsilon) >>> >> 134 """Train a k-means clustering model.""" >>> >> 135 model = callMLlibFunc("trainKMeansModel", >>> >> rdd.map(_convert_to_vector), k, maxIterations, >>> >> --> 136 runs, initializationMode, seed, >>> >> initializationSteps, epsilon) >>> >> 137 centers = callJavaFunc(rdd.context, >>> >> model.clusterCenters) >>> >> 138 return KMeansModel([c.toArray() for c in centers]) >>> >> >>> >> /root/spark/python/pyspark/mllib/common.pyc in callMLlibFunc(name, >>> >> *args) >>> >> 126 sc = SparkContext._active_spark_context >>> >> 127 api = getattr(sc._jvm.PythonMLLibAPI(), name) >>> >> --> 128 return callJavaFunc(sc, api, *args) >>> >> 129 >>> >> 130 >>> >> >>> >> /root/spark/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func, >>> >> *args) >>> >> 119 """ Call Java Function """ >>> >> 120 args = [_py2java(sc, a) for a in args] >>> >> --> 121 return _java2py(sc, func(*args)) >>> >> 122 >>> >> 123 >>> >> >>> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in >>> >> __call__(self, *args) >>> >> 534 END_COMMAND_PART >>> >> 535 >>> >> --> 536 answer = self.gateway_client.send_command(command) >>> >> 537 return_value = get_return_value(answer, >>> >> self.gateway_client, >>> >> 538 self.target_id, self.name) >>> >> >>> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in >>> >> send_command(self, command, retry) >>> >> 367 if retry: >>> >> 368 #print_exc() >>> >> --> 369 response = self.send_command(command) >>> >> 370 else: >>> >> 371 response = ERROR >>> >> >>> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in >>> >> send_command(self, command, retry) >>> >> 360 the Py4J protocol. >>> >> 361 """ >>> >> --> 362 connection = self._get_connection() >>> >> 363 try: >>> >> 364 response = connection.send_command(command) >>> >> >>> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in >>> >> _get_connection(self) >>> >> 316 connection = self.deque.pop() >>> >> 317 except Exception: >>> >> --> 318 connection = self._create_connection() >>> >> 319 return connection >>> >> 320 >>> >> >>> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in >>> >> _create_connection(self) >>> >> 323 connection = GatewayConnection(self.address, >>> >> self.port, >>> >> 324 self.auto_close, self.gateway_property) >>> >> --> 325 connection.start() >>> >> 326 return connection >>> >> 327 >>> >> >>> >> /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in >>> >> start(self) >>> >> 430 'server' >>> >> 431 logger.exception(msg) >>> >> --> 432 raise Py4JNetworkError(msg) >>> >> 433 >>> >> 434 def close(self): >>> >> >>> >> Py4JNetworkError: An error occurred while trying to connect to the >>> >> Java server >>> > >>> > >>> > Is there any specific setting that I am missing , that causes this >>> > error? >>> > >>> > Thanks and Regards, >>> > Rogers Jeffrey L >> >> > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
