magistraleinformaticanetworking:spm:sample_dynmap14
- dynmap.cpp
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* *************************************************************************** * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 as * published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * * As a special exception, you may use this file as part of a free software * library without restriction. Specifically, if other files instantiate * templates or use macros or inline functions from this file, or you compile * this file and link it with other files to produce an executable, this * file does not by itself cause the resulting executable to be covered by * the GNU General Public License. This exception does not however * invalidate any other reasons why the executable file might be covered by * the GNU General Public License. * **************************************************************************** */ /* * Author: Massimo Torquati <[email protected]> * Date: Dicember 2014 */ /* Map skeleton implementation with dynamic scheduling. * SPM Class Work 5. * */ #include <cstdio> #include <iostream> #include <vector> #include <ff/farm.hpp> // selective debugging, compile with -DDEBUG to enable debugging info #if defined(DEBUG) #define DBG(X) X #else #define DBG(X) #endif using namespace ff; // this is the task struct task_t { void set(long s, long e) { start=s,end=e; } long start, end; }; // compare function for pairs static inline bool data_cmp(const std::pair<long,task_t> &a,const std::pair<long,task_t> &b) { return a.first < b.first; } // this is the scheduler class Scheduler: public ff_node_t<task_t> { protected: // initialize the internal table (data vector) inline size_t init_data(long start, long stop) { const long numtasks = stop-start; long totalnumtasks = std::lrint(std::ceil(numtasks/(double)_chunk)); long tt = totalnumtasks; size_t ntxw = totalnumtasks / _nw; size_t r = totalnumtasks % _nw; // try to keep the n. of tasks per worker as smaller as possible if (ntxw == 0 && r>=1) { ntxw = 1, r = 0; } data.resize(_nw); taskv.resize(_nw); long end, t=0, e; for(size_t i=0;i<_nw && totalnumtasks>0;++i, totalnumtasks-=t) { t = ntxw + ( (r>1 && (i<r)) ? 1 : 0 ); e = start + (t*_chunk - 1) + 1; end = (e<stop) ? e : stop; data[i].first=t; data[i].second.set(start,end); start = (end-1)+ 1; } if (totalnumtasks) { assert(totalnumtasks==1); // try to keep the n. of tasks per worker as smaller as possible if (ntxw > 1) data[_nw-1].first += totalnumtasks; else { --tt, _chunk*=2; } data[_nw-1].second.end = stop; } DBG(printf("init_data\n"); for(size_t i=0;i<_nw;++i) { printf("W=%ld %ld <%ld,%ld>\n", i, data[i].first, data[i].second.start, data[i].second.end); } printf("totaltasks=%ld\n", tt); ); return tt; } inline void fillTask(task_t &task, const int id) { long start = data[id].second.start; long end = std::min(start+_chunk, data[id].second.end); --data[id].first, (data[id].second).start = (end-1) + 1; task.set(start, end); } public: Scheduler(ff_loadbalancer* lb, long start, long stop, long chunk, int nw): lb(lb),_start(start),_stop(stop),_chunk(chunk),totaltasks(0),_nw(nw) { totaltasks = init_data(start,stop); assert(totaltasks>=1); } // get the next task if any bool nextTask(task_t &task, const int wid) { if (data[wid].first) { fillTask(task,wid); return true; } // no available task for the current thread long maxid = (std::max_element(data.begin(),data.end(),data_cmp) - data.begin()); if (data[maxid].first > 0) { fillTask(task,maxid); return true; } return false; } inline task_t* svc(task_t* t) { if (t==nullptr) { size_t remaining = totaltasks; for(size_t wid=0;wid<_nw;++wid) { if (data[wid].first >0) { long start = data[wid].second.start; long end = std::min(start+_chunk, data[wid].second.end); taskv[wid].set(start, end); lb->ff_send_out_to(&taskv[wid], (int)wid); --remaining, --data[wid].first; (data[wid].second).start = (end-1)+1; } } return (remaining>0) ? GO_ON : EOS; } auto wid = lb->get_channel_id(); if (--totaltasks <=0) return EOS; if (nextTask(*t, (int)wid)) lb->ff_send_out_to(t, int(wid)); else lb->ff_send_out_to(EOS, (int)wid); return GO_ON; } protected: ff_loadbalancer *lb; long _start,_stop, _chunk; size_t totaltasks, _nw; std::vector<std::pair<long, task_t> > data; // internal table of tasks std::vector<task_t> taskv; // tasks vector, avoiding dynanic memory allocation }; template<typename FUNC_t> class Worker: public ff_node_t<task_t> { public: Worker(FUNC_t F):F(F) {} inline task_t* svc(task_t* task) { DBG(printf("W(%d) <%ld,%ld>\n", get_my_id(), task->start, task->end)); F(task->start,task->end); return task; } FUNC_t F; }; void usage(char *argv[]) { std::cerr << "use: " << argv[0] << " numworkers chunksize arraysize\n"; std::cerr << " example: " << argv[0] << " 2 10 1000\n\n"; } int main(int argc, char *argv[]) { if (argc<4) { usage(argv); return -1; } int nw = atoi(argv[1]); long chunk = atol(argv[2]); long size = atol(argv[3]); if (nw <= 0 || size <= 0) { usage(argv); return -1; } if (chunk<=0) chunk = size/nw; // create and initialize the array std::vector<double> V(size); for (size_t i = 0; i < (size_t)size; i++) V[i] = ( pow(i,3) / (i+1) ); DBG( for(size_t i=0;i<V.size();++i) printf("%g ", V[i]); printf("\n"); ); // this is the function to compute on each single element of the vector V auto F = [&V](const long start, const long stop) { for(long i=start; i < stop; ++i) { size_t k = V[i]; double r = 1.0; for(size_t j=0;j<k;++j) r*=r*sin(r/k); // just some dummy computation V[i] = r; } }; ffTime(START_TIME); if (nw == 1) F(0,size); else { std::vector<ff_node *> W; for(int i=0;i<nw;++i) W.push_back(new Worker<decltype(F)>(F)); ff_farm<> farm(W); farm.remove_collector(); // remove default collector Scheduler Sched(farm.getlb(),0,size,chunk,nw); farm.add_emitter(&Sched); farm.wrap_around(); if (farm.run_and_wait_end() < 0) { error("running farm\n"); return -1; } } ffTime(STOP_TIME); std::cout << "Time (ms) = " << ffTime(GET_TIME) << "\n"; #if defined(CHECK_RESULT) // summing all elements double s=0; for (size_t i = 0; i < (size_t)size; i++) s+=V[i]; std::cout << "Result s=" << s << "\n"; #endif return 0; }
magistraleinformaticanetworking/spm/sample_dynmap14.txt · Ultima modifica: 17/12/2014 alle 06:33 (10 anni fa) da Massimo Torquati