Code to be compiled with a

nvcc -I fastflowInclude
mapcuda.cu
#include <iostream>
#include <stdlib.h>
#include <time.h>
 
#include <ff/pipeline.hpp>
 
using namespace ff; 
/*
 * map module running the map of function float f(float) 
 * over a float vector whose size is known on a GPU through CUDA
 *
 */
 
#include <cuda.h>
#include <cuda_runtime.h>
 
/* 
 * sample map function
 */
__device__ float f(float x) {
    float old = x;
    for(int i=0; i<8000000; i++) {
      x = sin(x);
    }
    return(old+1.0+x);
}
 
__device__ float g(float x) {
  for(int i=0; i<8000000; i++) 
    x = sin(x);
}
 
/* map kernel */
// __global__ void map1f(float *task, float *ris, int n, float (*f)(float)) {
__global__ void map1f(float *task, float *ris, int n) {
  int i = blockDim.x * blockIdx.x + threadIdx.x;
  if(i<n) 
    ris[i] = f(task[i]); 
}
 
typedef struct _task { 
  float * v; 
  int     l; 
  } TASK; 
 
#define ERROR(s) {std::cerr << s << std::endl; return(NULL); }
 
class GenStream: public ff_node {
private: 
  int sl; 
  int n; 
 
public: 
 
  GenStream(int mm, int nn) { 
    sl = mm; 
    n  = nn; 
#ifdef DEBUG
    std::cout << "GenStream intialized: sl = " << sl << " n= " << n << std::endl;
#endif
  }
 
  void * svc(void * task) {
    while(sl-- > 0) {
#ifdef DEBUG
      // sleep(1);
#endif
      float * x = new float[n]; 
      for(int i=0; i<n; i++) 
	x[i]=i;
      TASK * t = new TASK();
      t->v = x; 
      t->l = n; 
#ifdef DEBUG
      std::cout << "DRAIN: going to deliver vector ";
      for(int i=0; i<(t->l); i++) 
	std::cout << (t->v)[i] << " ";
      std::cout << std::endl; 
#endif
      ff_send_out((void *)t); 
    }
    return(NULL); 
  }
}; 
 
class Drain: public ff_node {
private: 
  int computed;
public:
  int svc_init() {
    computed=0; 
  }
 
  void svc_end() {
    std::cout << "Drain received " << computed << std::endl; 
  }
 
  void * svc(void * t) {
    TASK * tt = (TASK *)t;
    computed++;
#ifdef DEBUG
    std::cout << "Received result "; 
    if(t != NULL) 
      for(int i=0; i<(tt->l); i++) 
	std::cout << (tt->v)[i] << " "; 
    std::cout << std::endl;
#endif
    return(GO_ON);
  }
};
 
class Mapper: public ff_node {
  float (*f)(float); 
public:
  Mapper(float (*ff)(float)) {
    f = ff; 
  }
 
  void * svc(void* task) {
    TASK * t = (TASK *) task; 
    int n = (t->l);
#ifdef DEBUG
    std::cout << "Mapper got a task " ; 
    for(int i=0; i< (t->l); i++) 
      std::cout << (t->v)[i] << " ";
    std::cout << std::endl; 
    std::cout << "MAPPER: going to compute n=" << n << std::endl;
#endif
    float * v = t->v;
    float * r = (float *) calloc(n,sizeof(float));
    int dims  = n * sizeof(float);
#ifdef TIME
    struct timespec t0,t1,t2,t3,t4;
    clock_gettime(CLOCK_THREAD_CPUTIME_ID, &t0);
#endif
    float * d_v = NULL; 
    if(cudaMalloc((void **)&d_v,dims)!=cudaSuccess)                ERROR("Allocating v");
    float * d_r = NULL; 
    if(cudaMalloc((void **)&d_r,dims)!=cudaSuccess)                ERROR("Allocating r");      
    if(cudaMemcpy(d_v,v,dims,cudaMemcpyHostToDevice)!=cudaSuccess) ERROR("Copying v to gpu");
#ifdef TIME
    clock_gettime(CLOCK_THREAD_CPUTIME_ID, &t1);
    long el  = (t1.tv_sec - t0.tv_sec)*1000000L + (t1.tv_nsec - t0.tv_nsec) / 1000; 
    std::cout << "spent total " << el << " usecs to offload data to GPU " << std::endl;
#endif
    // blocks and threads here must be properly computed. These work with the proof of concept code
    map1f<<<1024,64>>>(d_v,d_r,n);
#ifdef TIME
    clock_gettime(CLOCK_THREAD_CPUTIME_ID, &t2);
    el  = (t2.tv_sec - t1.tv_sec)*1000000L + (t2.tv_nsec - t1.tv_nsec) /1000; 
    std::cout << "spent total " << el << " usecs for on the GPU for the kernel execution  " << std::endl;
#endif
    if(cudaGetLastError() != cudaSuccess)                          ERROR("computing map");
    if(cudaMemcpy(r,d_r,dims,cudaMemcpyDeviceToHost)!=cudaSuccess) ERROR("Copying back r");
    if(cudaFree(d_v) != cudaSuccess)                               ERROR("Releasing v");
    if(cudaFree(d_r) != cudaSuccess)                               ERROR("Releasing r");
    if(cudaDeviceReset() != cudaSuccess)                           ERROR("Resetting device");
#ifdef TIME
    clock_gettime(CLOCK_THREAD_CPUTIME_ID, &t4);
    el  = (t4.tv_sec - t0.tv_sec)*1000000L + (t4.tv_nsec - t0.tv_nsec) /1000; 
    std::cout << "spent total " << el << " usecs total for on the GPU for 1 map " << std::endl;
#endif
    TASK * res = new TASK(); 
    res->v = r; 
    res->l = n; 
#ifdef DEBUG
    std::cout << "Mapper going to deliver result of len " << res->l << " ! "; 
    for(int i=0; i< (res->l); i++) 
      std::cout << (res->v)[i] << " ";
    std::cout << std::endl; 
#endif
    return((void *) res);
  }
};
 
//
// this is the sample test main 
//
int main(int argc, char * argv[]) {
 
  int n = atoi(argv[1]); /* number of elements in the vectors     */
  int m = atoi(argv[2]); /* number of vectors in the input stream */
 
#ifdef TIME
  struct timespec t0,t1,t2,t3;
  clock_getres(CLOCK_THREAD_CPUTIME_ID, &t0);
  std::cout << "time resolution is " << t0.tv_nsec << " nsecs" << std::endl;
#endif
 
  /* set up the program */ 
  ff_pipeline pgm; 
 
  pgm.add_stage(new GenStream(m,n)); 
  pgm.add_stage(new Mapper(g)); 
  pgm.add_stage(new Drain()); 
 
  pgm.run_and_wait_end();
  return(0);
}