Wow.. thats quite a bit of work. .. though I must admit on first inspection of the kernel code it was evident that as the kernel goes up and down tree there is a great deal of waiting.
Any how I don't quite have the energy to get another version working in the near future. I'm leaving it to the community to use the code or not. Regards Nithin @Andreas: I've attached an improved version with a few bug fixes. On 22 February 2011 23:24, Bryan Catanzaro <catan...@eecs.berkeley.edu> wrote: > There has been lots of work on Parallel Prefix Sum on GPUs. The > difference between implementations of Inclusive and Exclusive prefix > sum is very minor, and both can be implemented efficiently. > Focusing on work efficiency for GPUs is actually a distracting red > herring, especially when you're using a fully parallel recursive > parallel Prefix Sum, as you are doing. This is for two reasons: > 1. GPUs are SIMD machines, so work efficiency as counted by the PRAM > model does not always correlate to saving work on the actual hardware. > 2. The work involved in synchronizing and coordinating threads is not > counted as "work" in traditional analyses. On GPUs, floating-point > operations are essentially free. Communication and coordination > between parallel threads, however, is very expensive. Instead of > doing a fully parallel recursive parallel prefix sum, it's much more > efficient to sequentialize the sum as much as possible, expressing > only enough parallelism to fill the chip. > > It's also advantageous to take advantage of commutativity - have you > defined whether your scan implementation accepts associative but > non-commutative operators? If you require operators to be both > associative and commutative, you can improve performance > significantly. I wrote up some things about reductions (which are > related to scan) here: > http://developer.amd.com/documentation/articles/pages/opencl-optimization-case-study-simple-reductions.aspx > > As you can see, the "2-phase" reduction kernels are much faster than > the "recursive multi-phase" kernels. Your implementation uses the > recursive multi-phase approach. There are two analogues of the > 2-phase reduction kernel for scan on an n element array with p > processors (thread blocks), both of which require exactly 3 kernel > launches, regardless of n or p: > > * Reduce, Scan, Scan: Divide the input into blocks of n/p elements. > Perform reductions on these blocks in parallel, write out the sum of > each block to a temporary array. Perform a scan on the temporary > array (this should only require 1 thread block and one kernel, since > there are only p elements in the temporary array). Perform a scan on > the input, this time starting each block with the appropriate result > from the scan. > * Scan, Scan, Add: Scan the blocks in parallel, write the results out. > Perform a scan on the first result from each block (this should only > require 1 thread block and one kernel launch, since there are only p > blocks. Broadcast the appropriate result from the partial scan and > perform vector addition in each block. > > If you sequentialize these operations as much as possible, and do > parallel reduction/scan trees only when absolutely necessary, you will > save a lot of synchronization and communication cost. You can take a > look at Thrust [1] to see an example of how this can be done. > > Best of luck, > bryan > > [1] Thrust: http://code.google.com/p/thrust > > > > On Tue, Feb 22, 2011 at 1:47 AM, nithin s <nithin19...@gmail.com> wrote: >> Hi Andreas >> >> On 22 February 2011 05:45, Andreas Kloeckner <li...@informa.tiker.net> wrote: >>> Hi Nithin, >>> >>> two requests: >>> >>> - can you please resend this as an attachment? It's hard to fish out of >>> the text of an email. >> Done >>> >>> - please avoid using floating point functions (log, ceil, floor) in >>> integer contexts. PyCUDA comes with a bitlog2 function that does what >>> you need, I think. >> >> bitlog2 alone doesn't cut it. This is becase the log is taken to the >> base 2*block_size. block_size need not be a power of 2 in a few rare >> cases. This is because if shared mem is limited then the block_size = >> shared_mem/item_size. Now Item size need not be a power of 2 (If we >> are willing to support arbitrary types.. though there is a limitation >> .. since dtype needs to be known for partial sum array >> allocations..which is presumably numpy.dtype). >> >> This will mess up the estimate. I could recode this by writing a >> routine by repeatedly dividing and calculating the necessary int ciel. >> I feel the current expression is cleaner and concise. Let me know if >> you still feel otherwise. >> >>> >>> Once I get the file posted on the PyCUDA branch, I'll write a more >>> complete review. I agree with your assessment of inclusive vs exclusive >>> scan. I'd say go ahead and kill the inclusive version. >>> >>> Tomasz, what's your take here? >>> >>> Andreas >>> >>> >> >> Regards >> >> Nithin >> >> >> @Bryan: Tomaszs' original inclusive scan impl was based on the naive >> prefix scan algorithm at http://en.wikipedia.org/wiki/Prefix_sum. This >> is not particularly work efficient. I don't (yet) see a neat way to >> convert the Exclusive Mark Harris version to an inclusive one.Thus I >> thought it better to maintain a single exclusive prefix scan. >> >> _______________________________________________ >> PyCUDA mailing list >> PyCUDA@tiker.net >> http://lists.tiker.net/listinfo/pycuda >> >> >
from pycuda import driver, compiler, gpuarray, tools, reduction from warnings import warn exclusive_scan_source = """ %(preamble)s #define SUM(a, b) (%(scan_operation)s) __device__ unsigned int %(name)s_next_power_of_2(unsigned int v) { v -= 1; v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16; return v + 1; } extern "C" { __global__ void %(name)s(const %(data_type)s *i_data,%(data_type)s *o_data %(if_part_sum)s ,%(data_type)s *partial_sum ,const int n %(if_tail)s ,const int block_index ) { extern __shared__ %(data_type)s shared_array[]; int source_node_index = threadIdx.x; // Size of shared array that can contain input data %(if_main)s const int full_array_size = n; %(if_tail)s const int full_array_size = %(name)s_next_power_of_2(n); %(if_main)s const int block_index = blockIdx.x; const int block_offset = 2*block_index*blockDim.x; int tree_node_distance = 1; int target_node_index = threadIdx.x + (n/2); %(if_tail)s if (source_node_index < n) { shared_array[source_node_index + SHARED_BANK_PADDING(source_node_index)] = i_data[source_node_index + block_offset]; } %(if_tail)s else {shared_array[source_node_index + SHARED_BANK_PADDING(source_node_index)] = %(neutral)s;} %(if_tail)s if (target_node_index < n) { shared_array[target_node_index + SHARED_BANK_PADDING(target_node_index)] = i_data[target_node_index + block_offset]; } %(if_tail)s else {shared_array[target_node_index + SHARED_BANK_PADDING(target_node_index)] = %(neutral)s;} // Travel upwards, from leaves to the root of the tree // During each loop time distance between nodes increases two times, // and number of nodes decreases two times for (int number_of_nodes = full_array_size>>1; number_of_nodes > 0; number_of_nodes >>= 1) { __syncthreads(); if (threadIdx.x < number_of_nodes) { int source_node_index = tree_node_distance*(2*threadIdx.x+1)-1; int target_node_index = tree_node_distance*(2*threadIdx.x+2)-1; source_node_index += SHARED_BANK_PADDING(source_node_index); target_node_index += SHARED_BANK_PADDING(target_node_index); shared_array[target_node_index] = SUM(shared_array[target_node_index], shared_array[source_node_index]); } tree_node_distance <<= 1; } if (threadIdx.x == 0) { %(if_part_sum)s partial_sum[block_index] = shared_array[full_array_size-1 + SHARED_BANK_PADDING(full_array_size-1)]; shared_array[full_array_size-1 + SHARED_BANK_PADDING(full_array_size-1)] = %(neutral)s; } // Travel downwards, from root to the leaves // During each loop number of nodes increases two times and // distance between nodes decreases two times for (int number_of_nodes = 1; number_of_nodes < full_array_size; number_of_nodes <<= 1) { tree_node_distance >>= 1; __syncthreads(); if (threadIdx.x < number_of_nodes) { int source_node_index = tree_node_distance*(2*threadIdx.x+1)-1; int target_node_index = tree_node_distance*(2*threadIdx.x+2)-1; source_node_index += SHARED_BANK_PADDING(source_node_index); target_node_index += SHARED_BANK_PADDING(target_node_index); %(data_type)s temp = shared_array[source_node_index]; shared_array[source_node_index] = shared_array[target_node_index]; shared_array[target_node_index] = SUM(shared_array[target_node_index], temp); } } __syncthreads(); %(if_tail)s if (source_node_index < n) { o_data[source_node_index + block_offset] = shared_array[source_node_index + SHARED_BANK_PADDING(source_node_index)]; } %(if_tail)s if (target_node_index < n) { o_data[target_node_index + block_offset] = shared_array[target_node_index + SHARED_BANK_PADDING(target_node_index)]; } } } """ uniform_sum_source = """ %(preamble)s #define SUM(a, b) (%(scan_operation)s) extern "C" { __global__ void %(name)s(%(data_type)s *o_data,%(data_type)s *partial_sum ,const int n %(if_tail)s ,const int block_index ) { extern __shared__ %(data_type)s shared_array[]; int source_node_index = threadIdx.x; // Size of shared array that can contain input data %(if_main)s const int block_index = blockIdx.x+1; const int block_offset = 2*block_index*blockDim.x; int target_node_index = threadIdx.x + ((n==1)?(1):(n/2)); if (threadIdx.x == 0) { shared_array[0] = partial_sum[block_index]; } __syncthreads(); %(data_type)s prev_block_sum = shared_array[0]; %(if_tail)s if (source_node_index < n) { o_data[source_node_index + block_offset] = SUM(prev_block_sum,o_data[source_node_index + block_offset]); } %(if_tail)s if (target_node_index < n) { o_data[target_node_index + block_offset] = SUM(prev_block_sum,o_data[target_node_index + block_offset]); } } } """ def _div_ceil(nr, dr): return int(int(nr + dr -1)/int(dr)) def get_num_levels(n,c): l = 0 while(n != 1): l += 1 n = _div_ceil(n,c) return l def get_num_chunks(n,c,i): return _div_ceil(n,pow(c,i)) # TODO: check this four .. is it future proof?? padding_preamble = "\n#define SHARED_BANK_PADDING(n) ((n) >> 4)\n" #padding_preamble = "\n#define SHARED_BANK_PADDING(n) 0\n" class ExclusivePrefixKernel(object): def __init__(self, data_type, scan_operation, neutral,item_size = None, keep = False, options = [], preamble = '',name='prefix_kernel'): self.item_size = item_size if self.item_size == None: # Determine the size of used data type self.numpy_type = tools.parse_c_arg("const %s * in" % data_type).dtype self.item_size = self.numpy_type.itemsize # Determine the number of threads dev = driver.Context.get_device() block_size = dev.get_attribute(driver.device_attribute.MAX_THREADS_PER_BLOCK) block_dimension = dev.get_attribute(driver.device_attribute.MAX_BLOCK_DIM_X) self.block_size = min(block_size, block_dimension) # Shared memory size: two items per thread, number of threads, # size of one data item self.shared_size = self.get_chunk_size()*self.item_size # Padding to avoid bank conflicts # TODO: is this always 4?? log_num_banks = 4 self.shared_size += ((self.get_chunk_size() >> log_num_banks) * self.item_size) # Check whether we do not exceed available shared memory size max_shared_size = dev.get_attribute(driver.device_attribute.MAX_SHARED_MEMORY_PER_BLOCK) if self.shared_size > max_shared_size: warn ("Changed shared size") self.shared_size = max_shared_size self.block_size = self.shared_size/(2*self.item_size) self.main_function = self.get_main_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.main_function.prepare("PPi", block=(self.block_size, 1, 1), shared=self.shared_size) self.tail_function = self.get_tail_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.tail_function.prepare("PPii", block=(self.block_size, 1, 1), shared=self.shared_size) self.main_part_sum_function = self.get_main_part_sum_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.main_part_sum_function.prepare("PPPi", block=(self.block_size, 1, 1), shared=self.shared_size) self.tail_part_sum_function = self.get_tail_part_sum_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.tail_part_sum_function.prepare("PPPii", block=(self.block_size, 1, 1), shared=self.shared_size) self.main_uniform_sum_function = self.get_main_uniform_sum_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.main_uniform_sum_function.prepare("PPi", block=(self.block_size, 1, 1), shared=self.item_size) self.tail_uniform_sum_function = self.get_tail_uniform_sum_function(data_type, scan_operation, neutral, name, keep, options, preamble) self.tail_uniform_sum_function.prepare("PPii", block=(self.block_size, 1, 1), shared=self.item_size) # Use maximum available shared memory in 2.x devices # TODO: is it needed as we are more limited by number of threads? # Might be needed for large data types (?) if dev.compute_capability() >= (2, 0): cache_size = pycuda.driver.func_cache.PREFER_SHARED self.main_function.set_cache_config(cache_size) self.tail_function.set_cache_config(cache_size) self.main_part_sum_function.set_cache_config(cache_size) self.tail_part_sum_function.set_cache_config(cache_size) def get_chunk_size(self): return 2 * self.block_size def get_main_function(self, data_type, scan_operation, neutral, name = 'prefix_kernel', keep = False, options = [], preamble = ''): src = exclusive_scan_source % { 'data_type': data_type, 'name': name, 'neutral': neutral, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "//", 'if_main': "", 'if_part_sum': "//", } return compiler.SourceModule(src, options=options, keep=keep,no_extern_c=True).get_function(name) def get_tail_function(self, data_type, scan_operation, neutral, name = 'prefix_kernel', keep = False, options = [], preamble = ''): src = exclusive_scan_source % { 'data_type': data_type, 'name': name, 'neutral': neutral, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "", 'if_main': "//", 'if_part_sum': "//", } return compiler.SourceModule(src, options=options, keep=keep,no_extern_c=True).get_function(name) def get_main_part_sum_function(self, data_type, scan_operation, neutral, name = 'prefix_kernel', keep = False, options = [], preamble = ''): src = exclusive_scan_source % { 'data_type': data_type, 'name': name, 'neutral': neutral, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "//", 'if_main': "", 'if_part_sum': "", } return compiler.SourceModule(src, options=options, keep=keep,no_extern_c=True).get_function(name) def get_tail_part_sum_function(self, data_type, scan_operation, neutral, name = 'prefix_kernel', keep = False, options = [], preamble = ''): src = exclusive_scan_source % { 'data_type': data_type, 'name': name, 'neutral': neutral, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "", 'if_main': "//", 'if_part_sum': "", } return compiler.SourceModule(src, options=options, keep=keep,no_extern_c=True).get_function(name) def get_main_uniform_sum_function(self, data_type, scan_operation, neutral, name = 'uniform_add_kernel', keep = False, options = [], preamble = ''): src = uniform_sum_source % { 'data_type': data_type, 'name': name, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "//", 'if_main': "", } return compiler.SourceModule(src, options=options, keep=keep,no_extern_c=True).get_function(name) def get_tail_uniform_sum_function(self, data_type, scan_operation, neutral, name = 'uniform_add_kernel', keep = False, options = [], preamble = ''): src = uniform_sum_source % { 'data_type': data_type, 'name': name, 'scan_operation': scan_operation, 'preamble': preamble+padding_preamble, 'if_tail': "", 'if_main': "//", } return compiler.SourceModule(src, options=options, keep=keep,no_extern_c=True).get_function(name) def call_final(self,input_size,i_data,o_data): block_count = _div_ceil(input_size,self.get_chunk_size()) if input_size != block_count * self.get_chunk_size(): if block_count > 1: self.main_function.prepared_call((block_count-1, 1), i_data, o_data, self.get_chunk_size()) self.tail_function.prepared_call((1, 1), i_data, o_data, input_size - (block_count - 1) * self.get_chunk_size(), (block_count - 1)) else: self.main_function.prepared_call((block_count, 1), i_data, o_data, self.get_chunk_size()) def call_intermediate(self,input_size,i_data,o_data,part_sum_buf): block_count = _div_ceil(input_size,self.get_chunk_size()) if input_size != block_count * self.get_chunk_size(): if block_count > 1: self.main_part_sum_function.prepared_call((block_count-1, 1), i_data, o_data,part_sum_buf, self.get_chunk_size()) self.tail_part_sum_function.prepared_call((1, 1), i_data, o_data,part_sum_buf, input_size - (block_count - 1) * self.get_chunk_size(), (block_count - 1)) else: self.main_part_sum_function.prepared_call((block_count, 1), i_data, o_data,part_sum_buf, self.get_chunk_size()) def call_uniform_add(self,input_size,i_data,o_data,part_sum_buf): block_count = _div_ceil(input_size,self.get_chunk_size()) assert block_count > 1 if input_size != block_count * self.get_chunk_size(): block_count -= 1 if block_count > 1: self.main_uniform_sum_function.prepared_call((block_count-1, 1), o_data, part_sum_buf, self.get_chunk_size()) block_count += 1 self.tail_uniform_sum_function.prepared_call((1, 1), o_data,part_sum_buf, input_size - (block_count - 1) * self.get_chunk_size(), (block_count - 1)) else: block_count -= 1 self.main_uniform_sum_function.prepared_call((block_count, 1), o_data, part_sum_buf, self.get_chunk_size()) def __call__(self, *args, **kwargs): i_data = kwargs.get('i_data') if i_data is None: i_data = args[0] o_data = kwargs.get('o_data') if o_data is None: o_data = args[1] n = kwargs.get('n') if n is None: n = min(i_data.size,o_data.size) num_levels = get_num_levels(n,self.get_chunk_size()) part_sum_buf_szs = [ get_num_chunks(n,self.get_chunk_size(),l) for l in range(1,num_levels) ] part_sum_bufs = [ driver.mem_alloc(sz*self.item_size) for sz in part_sum_buf_szs] callargsets = [ [n,i_data.gpudata,o_data.gpudata] ] + [ [sz,ps_buf,ps_buf] for sz,ps_buf in zip(part_sum_buf_szs,part_sum_bufs)] for i,ps_buf in enumerate(part_sum_bufs): callargsets[i] += [ps_buf] for callargset in callargsets[0:-1]: self.call_intermediate(*callargset) self.call_final(*callargsets[-1]) for callargset in reversed(callargsets[0:-1]): self.call_uniform_add(*callargset) if __name__=='__main__': from pycuda import driver, compiler, gpuarray, tools from scan import * import numpy as np import pycuda.autoinit # sample usage for vector types and in generel arbitrary types. # note:: size in bytes needs to be specified for non trivial types n = 1024*1024 i_data = gpuarray.empty([n,3],dtype=np.int32) i_data.fill(np.int32(1)) preamble = """ inline __device__ int3 operator+(int3 a, int3 b) { return make_int3(a.x + b.x, a.y + b.y, a.z + b.z); } """ scan_kern = ExclusivePrefixKernel('int3','a+b','make_int3(0,0,0)',item_size=4*3,preamble=preamble,) print i_data.get() scan_kern(i_data,i_data,n=n) print i_data.get()
_______________________________________________ PyCUDA mailing list PyCUDA@tiker.net http://lists.tiker.net/listinfo/pycuda