magistraleinformaticanetworking:spm:spm1213_c_ff_mixskepumap
- mixskepuffmap.cpp
#include <iostream> #include <ff/pipeline.hpp> #include <ff/farm.hpp> #include <math.h> #include <skepu/vector.h> #include <skepu/map.h> using namespace ff; using namespace std; int N = 10; class Source: public ff_node { public: Source(unsigned int streamlen):streamlen(streamlen) {} void * svc(void * task) { if(streamlen != 0) { skepu::Vector<float> * v = new skepu::Vector<float>(N,(float)streamlen); streamlen--; task = (void *) v; #ifdef DEBUG std::cout << "Source delivering:" << *v << std::endl; #endif } else { task = NULL; } return task; } private: unsigned int streamlen; }; class Drain: public ff_node { void * svc(void * task) { skepu::Vector<float> * v = (skepu::Vector<float> *) task; #ifdef DEBUG std::cout << "Drain got " << *v << std::endl; #endif return(GO_ON); } }; #define ITERNO 800000 UNARY_FUNC(iters, float, a, for(int _i=0; _i<ITERNO; _i++) a = sin(a); return(a); ) float f_iters(float a) { for(int _i=0; _i<ITERNO; _i++) a = sin(a); return(a); } // UNARY_FUNC(f, float, a, return(a+1); ) class MapStage:public ff_node { private: int tasks; int wno; public: MapStage(int wno):tasks(0),wno(wno) {} void svc_end() { cout << "SKEPU map stage " << wno << " computed " << tasks << " tasks " << e ndl; } void *svc(void * task) { tasks++; skepu::Vector<float> * v = (skepu::Vector<float> *) task; #ifdef DEBUG std::cout << "MapStage got: " << *v << std::endl; #endif skepu::Vector<float> * r = new skepu::Vector<float>(v->size()); skepu::Map<iters> skepumap(new iters); skepumap(*v, *r); #ifdef DEBUG std::cout << "MapStage delivering: " << *r << std::endl; #endif return((void *) r); } }; class SeqMapStage : public ff_node { private: int tasks; int wno; public: SeqMapStage(int wno):tasks(0),wno(wno) {} void svc_end() { cout << "CPU worker " << wno << " computed " << tasks << " tasks " << endl; } void * svc(void * task) { tasks++; skepu::Vector<float> * v = (skepu::Vector<float> *) task; skepu::Vector<float> * r = new skepu::Vector<float>(v->size()); for(int i=0; i<v->size(); i++) (*r)[i] = f_iters((*v)[i]); return((void *) r); } }; class Emitter : public ff_node { void * svc (void * t) { return t; } }; class Collector : public ff_node { void * svc (void * t) { return t; } }; int main(int argc, char * argv[]) { if (argc==1) { std::cerr << "use: " << argv[0] << " streamlen veclen nw [skepuworkers]\n"; return -1; } int m = atoi(argv[1]); N = atoi(argv[2]); int nw = atoi(argv[3]); int skepuworker; if(argc>4) skepuworker=atoi(argv[4]); else skepuworker = 0; cout << "Using " << nw << " ff workers and " << skepuworker << " Skepu workers " << endl; // bild a 2-stage pipeline ff_pipeline pipe; pipe.add_stage(new Source(m)); // pipe.add_stage(new MapStage()); ff_farm<> farm; std::vector<ff_node *> w; if(skepuworker!=0) for(int i=0; i<skepuworker; i++) w.push_back(new MapStage(i)); // one stage on SKEPU for(int i=0; i<nw; i++) w.push_back(new SeqMapStage(i)); farm.set_scheduling_ondemand(); farm.add_workers(w); farm.add_collector(new Collector()); farm.add_emitter(new Emitter()); pipe.add_stage(&farm); pipe.add_stage(new Drain()); ffTime(START_TIME); if (pipe.run_and_wait_end()<0) { error("running pipeline\n"); return -1; } ffTime(STOP_TIME); std::cerr << "DONE, pipe time= " << pipe.ffTime() << " (ms)\n"; std::cerr << "DONE, total time= " << ffTime(GET_TIME) << " (ms)\n"; pipe.ffStats(std::cerr); return 0; }
magistraleinformaticanetworking/spm/spm1213_c_ff_mixskepumap.txt · Ultima modifica: 30/04/2013 alle 16:57 (12 anni fa) da Marco Danelutto