Strumenti Utente

Strumenti Sito


magistraleinformaticanetworking:spm:sample_dynmap14

Questa è una vecchia versione del documento!


/* * Author: Massimo Torquati torquati [at] di [dot] unipi [dot] it * Date: Dicember 2014 */

/* Map skeleton implementation with dynamic scheduling. * SPM Class Work 5. * */

#include <cstdio> #include <iostream> #include <vector> #include <ff/farm.hpp>

selective debugging, compile with -DDEBUG to enable debugging info #if defined(DEBUG) #define DBG(X) X #else #define DBG(X) #endif using namespace ff; this is the task struct task_t {

  void set(long s, long e)  { start=s,end=e; }
  long start, end;

};

compare function for pairs static inline bool data_cmp(const std::pair<long,task_t> &a,const std::pair<long,task_t> &b) { return a.first < b.first; } this is the scheduler class Scheduler: public ff_node_t<task_t> { protected:

  // initialize the internal table (data vector)
  inline size_t init_data(long start, long stop) {
      const long numtasks  = stop-start;
      long totalnumtasks   = std::lrint(std::ceil(numtasks/(double)_chunk));
      long tt     = totalnumtasks;
      size_t ntxw = totalnumtasks / _nw;
      size_t r    = totalnumtasks % _nw;
      
      // try to keep the n. of tasks per worker as smaller as possible
      if (ntxw == 0 && r>=1) {  ntxw = 1, r = 0; }
      
      data.resize(_nw); taskv.resize(_nw); 
      long end, t=0, e;
      for(size_t i=0;i<_nw && totalnumtasks>0;++i, totalnumtasks-=t) {
          t       = ntxw + ( (r>1 && (i<r)) ? 1 : 0 );
          e       = start + (t*_chunk - 1) + 1;
          end     = (e<stop) ? e : stop;
          data[i].first=t;
          data[i].second.set(start,end);
          start   = (end-1)+ 1;
      }
      
      if (totalnumtasks) {
          assert(totalnumtasks==1);
          // try to keep the n. of tasks per worker as smaller as possible
          if (ntxw > 1) data[_nw-1].first += totalnumtasks;
          else { --tt, _chunk*=2; }
          data[_nw-1].second.end = stop;
      } 
      DBG(printf("init_data\n");
    for(size_t i=0;i<_nw;++i) {
          printf("W=%ld %ld <%ld,%ld>\n", i, data[i].first, data[i].second.start, data[i].second.end);
    }
          printf("totaltasks=%ld\n", tt);
          );
      return tt;
  }    
  
  inline void fillTask(task_t &task, const int id) {
long start = data[id].second.start;
long end = std::min(start+_chunk, data[id].second.end);
--data[id].first, (data[id].second).start = (end-1) + 1;
task.set(start, end);
  }
  

public:

  Scheduler(ff_loadbalancer* lb, long start, long stop, long chunk, int nw):
      lb(lb),_start(start),_stop(stop),_chunk(chunk),totaltasks(0),_nw(nw) {
      
      totaltasks = init_data(start,stop);
      assert(totaltasks>=1);
  }
  
  // get the next task if any
  bool nextTask(task_t &task, const int wid) {
      if (data[wid].first) {
          fillTask(task,wid);
          return true;
      }	
      // no available task for the current thread
      
      long maxid = (std::max_element(data.begin(),data.end(),data_cmp) - data.begin());
      if (data[maxid].first > 0)  { fillTask(task,maxid); return true; }
      
      return false; 
  }
  inline task_t* svc(task_t* t) {
      if (t==nullptr) {
          size_t remaining    = totaltasks;
          
          for(size_t wid=0;wid<_nw;++wid) {
              if (data[wid].first >0) {
                  long start = data[wid].second.start;
                  long end   = std::min(start+_chunk, data[wid].second.end);
                  taskv[wid].set(start, end);
                  lb->ff_send_out_to(&taskv[wid], (int)wid);
                  --remaining, --data[wid].first;
                  (data[wid].second).start = (end-1)+1;
              }
          }
          return (remaining>0) ? GO_ON : EOS;
      }
      
      auto wid =  lb->get_channel_id();
      if (--totaltasks <=0) return EOS;
      
      if (nextTask(*t, (int)wid)) lb->ff_send_out_to(t, int(wid));            
      else  lb->ff_send_out_to(EOS, (int)wid);
      
      return GO_ON;
  }

protected:

  ff_loadbalancer  *lb;
  long              _start,_stop, _chunk;  
  size_t            totaltasks, _nw;
  std::vector<std::pair<long, task_t> >  data;  // internal table of tasks
  std::vector<task_t>                    taskv; // tasks vector, avoiding dynanic memory allocation

};

template<typename FUNC_t> class Worker: public ff_node_t<task_t> { public:

  Worker(FUNC_t F):F(F) {}
  inline task_t* svc(task_t* task) {
DBG(printf("W(%d) <%ld,%ld>\n", get_my_id(), task->start, task->end));
      F(task->start,task->end);
      return task;
  }
  FUNC_t  F;

};

void usage(char *argv[]) {

  std::cerr << "use: " << argv[0] << " numworkers chunksize arraysize\n";
  std::cerr << "  example: " << argv[0] << " 2 10 1000\n\n";

}

int main(int argc, char *argv[]) {

  if (argc<4) {
usage(argv);
return -1;
  }
  int  nw    = atoi(argv[1]);
  long chunk = atol(argv[2]);
  long size  = atol(argv[3]);
  if (nw <= 0 || size <= 0) { usage(argv); return -1; }
  if (chunk<=0) chunk = size/nw;
  
  // create and initialize the array
  std::vector<double> V(size);
  for (size_t i = 0; i < (size_t)size; i++)  V[i] = ( pow(i,3) / (i+1) );
  DBG(
for(size_t i=0;i<V.size();++i)
    printf("%g ", V[i]);
printf("\n");
     );
  // this is the function to compute on each single element of the vector V
  auto F = [&V](const long start, const long stop) {
      for(long i=start; i < stop; ++i) {
          size_t k = V[i];
          double r = 1.0;
          
          for(size_t j=0;j<k;++j) r*=r*sin(r/k); // just some dummy computation
          
          V[i] = r;
      }
  };
  ffTime(START_TIME);
  if (nw == 1) 
      F(0,size);    
  else {
      std::vector<ff_node *> W;
      for(int i=0;i<nw;++i)
          W.push_back(new Worker<decltype(F)>(F));
      
      ff_farm<> farm(W);
      farm.remove_collector(); // remove default collector
      Scheduler Sched(farm.getlb(),0,size,chunk,nw);
      farm.add_emitter(&Sched);
      farm.wrap_around();
      
      if (farm.run_and_wait_end() < 0) {
          error("running farm\n");
          return -1;
      }
  }
  ffTime(STOP_TIME);
  std::cout << "Time (ms) = " << ffTime(GET_TIME) << "\n";

#if defined(CHECK_RESULT)

  // summing all elements
  double s=0;
  for (size_t i = 0; i < (size_t)size; i++)  
s+=V[i];
  std::cout << "Result s=" << s << "\n";

#endif

  return 0;

}

magistraleinformaticanetworking/spm/sample_dynmap14.1418797914.txt.gz · Ultima modifica: 17/12/2014 alle 06:31 (10 anni fa) da Massimo Torquati

Donate Powered by PHP Valid HTML5 Valid CSS Driven by DokuWiki