Questa è una vecchia versione del documento!
/* * Author: Massimo Torquati torquati [at] di [dot] unipi [dot] it * Date: Dicember 2014 */
/* Map skeleton implementation with dynamic scheduling. * SPM Class Work 5. * */
#include <cstdio> #include <iostream> #include <vector> #include <ff/farm.hpp>
selective debugging, compile with -DDEBUG to enable debugging info #if defined(DEBUG) #define DBG(X) X #else #define DBG(X) #endif using namespace ff; this is the task struct task_t {
void set(long s, long e) { start=s,end=e; } long start, end;
};
compare function for pairs static inline bool data_cmp(const std::pair<long,task_t> &a,const std::pair<long,task_t> &b) { return a.first < b.first; } this is the scheduler class Scheduler: public ff_node_t<task_t> { protected:
// initialize the internal table (data vector) inline size_t init_data(long start, long stop) { const long numtasks = stop-start; long totalnumtasks = std::lrint(std::ceil(numtasks/(double)_chunk)); long tt = totalnumtasks; size_t ntxw = totalnumtasks / _nw; size_t r = totalnumtasks % _nw; // try to keep the n. of tasks per worker as smaller as possible if (ntxw == 0 && r>=1) { ntxw = 1, r = 0; } data.resize(_nw); taskv.resize(_nw);
long end, t=0, e; for(size_t i=0;i<_nw && totalnumtasks>0;++i, totalnumtasks-=t) { t = ntxw + ( (r>1 && (i<r)) ? 1 : 0 ); e = start + (t*_chunk - 1) + 1; end = (e<stop) ? e : stop; data[i].first=t; data[i].second.set(start,end); start = (end-1)+ 1; } if (totalnumtasks) { assert(totalnumtasks==1); // try to keep the n. of tasks per worker as smaller as possible if (ntxw > 1) data[_nw-1].first += totalnumtasks; else { --tt, _chunk*=2; } data[_nw-1].second.end = stop; } DBG(printf("init_data\n"); for(size_t i=0;i<_nw;++i) { printf("W=%ld %ld <%ld,%ld>\n", i, data[i].first, data[i].second.start, data[i].second.end); } printf("totaltasks=%ld\n", tt); ); return tt; } inline void fillTask(task_t &task, const int id) { long start = data[id].second.start; long end = std::min(start+_chunk, data[id].second.end); --data[id].first, (data[id].second).start = (end-1) + 1; task.set(start, end); }
public:
Scheduler(ff_loadbalancer* lb, long start, long stop, long chunk, int nw): lb(lb),_start(start),_stop(stop),_chunk(chunk),totaltasks(0),_nw(nw) { totaltasks = init_data(start,stop); assert(totaltasks>=1); } // get the next task if any bool nextTask(task_t &task, const int wid) { if (data[wid].first) { fillTask(task,wid); return true; } // no available task for the current thread long maxid = (std::max_element(data.begin(),data.end(),data_cmp) - data.begin()); if (data[maxid].first > 0) { fillTask(task,maxid); return true; } return false; }
inline task_t* svc(task_t* t) { if (t==nullptr) { size_t remaining = totaltasks; for(size_t wid=0;wid<_nw;++wid) { if (data[wid].first >0) { long start = data[wid].second.start; long end = std::min(start+_chunk, data[wid].second.end); taskv[wid].set(start, end); lb->ff_send_out_to(&taskv[wid], (int)wid); --remaining, --data[wid].first; (data[wid].second).start = (end-1)+1; } } return (remaining>0) ? GO_ON : EOS; } auto wid = lb->get_channel_id(); if (--totaltasks <=0) return EOS; if (nextTask(*t, (int)wid)) lb->ff_send_out_to(t, int(wid)); else lb->ff_send_out_to(EOS, (int)wid); return GO_ON; }
protected:
ff_loadbalancer *lb; long _start,_stop, _chunk; size_t totaltasks, _nw;
std::vector<std::pair<long, task_t> > data; // internal table of tasks std::vector<task_t> taskv; // tasks vector, avoiding dynanic memory allocation
};
template<typename FUNC_t> class Worker: public ff_node_t<task_t> { public:
Worker(FUNC_t F):F(F) {}
inline task_t* svc(task_t* task) { DBG(printf("W(%d) <%ld,%ld>\n", get_my_id(), task->start, task->end)); F(task->start,task->end); return task; }
FUNC_t F;
};
void usage(char *argv[]) {
std::cerr << "use: " << argv[0] << " numworkers chunksize arraysize\n"; std::cerr << " example: " << argv[0] << " 2 10 1000\n\n";
}
int main(int argc, char *argv[]) {
if (argc<4) { usage(argv); return -1; } int nw = atoi(argv[1]); long chunk = atol(argv[2]); long size = atol(argv[3]);
if (nw <= 0 || size <= 0) { usage(argv); return -1; } if (chunk<=0) chunk = size/nw; // create and initialize the array std::vector<double> V(size); for (size_t i = 0; i < (size_t)size; i++) V[i] = ( pow(i,3) / (i+1) ); DBG( for(size_t i=0;i<V.size();++i) printf("%g ", V[i]); printf("\n"); );
// this is the function to compute on each single element of the vector V auto F = [&V](const long start, const long stop) { for(long i=start; i < stop; ++i) { size_t k = V[i]; double r = 1.0; for(size_t j=0;j<k;++j) r*=r*sin(r/k); // just some dummy computation V[i] = r; } };
ffTime(START_TIME); if (nw == 1) F(0,size); else { std::vector<ff_node *> W; for(int i=0;i<nw;++i) W.push_back(new Worker<decltype(F)>(F)); ff_farm<> farm(W); farm.remove_collector(); // remove default collector Scheduler Sched(farm.getlb(),0,size,chunk,nw); farm.add_emitter(&Sched); farm.wrap_around(); if (farm.run_and_wait_end() < 0) { error("running farm\n"); return -1; } } ffTime(STOP_TIME); std::cout << "Time (ms) = " << ffTime(GET_TIME) << "\n";
#if defined(CHECK_RESULT)
// summing all elements double s=0; for (size_t i = 0; i < (size_t)size; i++) s+=V[i]; std::cout << "Result s=" << s << "\n";
#endif
return 0;
}