Code to be compiled with a
nvcc -I fastflowInclude
#include <iostream> #include <stdlib.h> #include <time.h> #include <ff/pipeline.hpp> using namespace ff; /* * map module running the map of function float f(float) * over a float vector whose size is known on a GPU through CUDA * */ #include <cuda.h> #include <cuda_runtime.h> /* * sample map function */ __device__ float f(float x) { float old = x; for(int i=0; i<8000000; i++) { x = sin(x); } return(old+1.0+x); } __device__ float g(float x) { for(int i=0; i<8000000; i++) x = sin(x); } /* map kernel */ // __global__ void map1f(float *task, float *ris, int n, float (*f)(float)) { __global__ void map1f(float *task, float *ris, int n) { int i = blockDim.x * blockIdx.x + threadIdx.x; if(i<n) ris[i] = f(task[i]); } typedef struct _task { float * v; int l; } TASK; #define ERROR(s) {std::cerr << s << std::endl; return(NULL); } class GenStream: public ff_node { private: int sl; int n; public: GenStream(int mm, int nn) { sl = mm; n = nn; #ifdef DEBUG std::cout << "GenStream intialized: sl = " << sl << " n= " << n << std::endl; #endif } void * svc(void * task) { while(sl-- > 0) { #ifdef DEBUG // sleep(1); #endif float * x = new float[n]; for(int i=0; i<n; i++) x[i]=i; TASK * t = new TASK(); t->v = x; t->l = n; #ifdef DEBUG std::cout << "DRAIN: going to deliver vector "; for(int i=0; i<(t->l); i++) std::cout << (t->v)[i] << " "; std::cout << std::endl; #endif ff_send_out((void *)t); } return(NULL); } }; class Drain: public ff_node { private: int computed; public: int svc_init() { computed=0; } void svc_end() { std::cout << "Drain received " << computed << std::endl; } void * svc(void * t) { TASK * tt = (TASK *)t; computed++; #ifdef DEBUG std::cout << "Received result "; if(t != NULL) for(int i=0; i<(tt->l); i++) std::cout << (tt->v)[i] << " "; std::cout << std::endl; #endif return(GO_ON); } }; class Mapper: public ff_node { float (*f)(float); public: Mapper(float (*ff)(float)) { f = ff; } void * svc(void* task) { TASK * t = (TASK *) task; int n = (t->l); #ifdef DEBUG std::cout << "Mapper got a task " ; for(int i=0; i< (t->l); i++) std::cout << (t->v)[i] << " "; std::cout << std::endl; std::cout << "MAPPER: going to compute n=" << n << std::endl; #endif float * v = t->v; float * r = (float *) calloc(n,sizeof(float)); int dims = n * sizeof(float); #ifdef TIME struct timespec t0,t1,t2,t3,t4; clock_gettime(CLOCK_THREAD_CPUTIME_ID, &t0); #endif float * d_v = NULL; if(cudaMalloc((void **)&d_v,dims)!=cudaSuccess) ERROR("Allocating v"); float * d_r = NULL; if(cudaMalloc((void **)&d_r,dims)!=cudaSuccess) ERROR("Allocating r"); if(cudaMemcpy(d_v,v,dims,cudaMemcpyHostToDevice)!=cudaSuccess) ERROR("Copying v to gpu"); #ifdef TIME clock_gettime(CLOCK_THREAD_CPUTIME_ID, &t1); long el = (t1.tv_sec - t0.tv_sec)*1000000L + (t1.tv_nsec - t0.tv_nsec) / 1000; std::cout << "spent total " << el << " usecs to offload data to GPU " << std::endl; #endif // blocks and threads here must be properly computed. These work with the proof of concept code map1f<<<1024,64>>>(d_v,d_r,n); #ifdef TIME clock_gettime(CLOCK_THREAD_CPUTIME_ID, &t2); el = (t2.tv_sec - t1.tv_sec)*1000000L + (t2.tv_nsec - t1.tv_nsec) /1000; std::cout << "spent total " << el << " usecs for on the GPU for the kernel execution " << std::endl; #endif if(cudaGetLastError() != cudaSuccess) ERROR("computing map"); if(cudaMemcpy(r,d_r,dims,cudaMemcpyDeviceToHost)!=cudaSuccess) ERROR("Copying back r"); if(cudaFree(d_v) != cudaSuccess) ERROR("Releasing v"); if(cudaFree(d_r) != cudaSuccess) ERROR("Releasing r"); if(cudaDeviceReset() != cudaSuccess) ERROR("Resetting device"); #ifdef TIME clock_gettime(CLOCK_THREAD_CPUTIME_ID, &t4); el = (t4.tv_sec - t0.tv_sec)*1000000L + (t4.tv_nsec - t0.tv_nsec) /1000; std::cout << "spent total " << el << " usecs total for on the GPU for 1 map " << std::endl; #endif TASK * res = new TASK(); res->v = r; res->l = n; #ifdef DEBUG std::cout << "Mapper going to deliver result of len " << res->l << " ! "; for(int i=0; i< (res->l); i++) std::cout << (res->v)[i] << " "; std::cout << std::endl; #endif return((void *) res); } }; // // this is the sample test main // int main(int argc, char * argv[]) { int n = atoi(argv[1]); /* number of elements in the vectors */ int m = atoi(argv[2]); /* number of vectors in the input stream */ #ifdef TIME struct timespec t0,t1,t2,t3; clock_getres(CLOCK_THREAD_CPUTIME_ID, &t0); std::cout << "time resolution is " << t0.tv_nsec << " nsecs" << std::endl; #endif /* set up the program */ ff_pipeline pgm; pgm.add_stage(new GenStream(m,n)); pgm.add_stage(new Mapper(g)); pgm.add_stage(new Drain()); pgm.run_and_wait_end(); return(0); }