/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 as * published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * * As a special exception, you may use this file as part of a free software * library without restriction. Specifically, if other files instantiate * templates or use macros or inline functions from this file, or you compile * this file and link it with other files to produce an executable, this * file does not by itself cause the resulting executable to be covered by * the GNU General Public License. This exception does not however * invalidate any other reasons why the executable file might be covered by * the GNU General Public License. * ** */ /* * Author: Massimo Torquati torquati [at] di [dot] unipi [dot] it * Date: Dicember 2014 */

/* Map skeleton implementation with dynamic scheduling. * SPM Class Work 5. * */

#include <cstdio> #include <iostream> #include <vector> #include <ff/farm.hpp>

selective debugging, compile with -DDEBUG to enable debugging info #if defined(DEBUG) #define DBG(X) X #else #define DBG(X) #endif using namespace ff; this is the task struct task_t {

  void set(long s, long e)  { start=s,end=e; }
  long start, end;


compare function for pairs static inline bool data_cmp(const std::pair<long,task_t> &a,const std::pair<long,task_t> &b) { return a.first < b.first; } this is the scheduler class Scheduler: public ff_node_t<task_t> { protected:

  // initialize the internal table (data vector)
  inline size_t init_data(long start, long stop) {
      const long numtasks  = stop-start;
      long totalnumtasks   = std::lrint(std::ceil(numtasks/(double)_chunk));
      long tt     = totalnumtasks;
      size_t ntxw = totalnumtasks / _nw;
      size_t r    = totalnumtasks % _nw;
      // try to keep the n. of tasks per worker as smaller as possible
      if (ntxw == 0 && r>=1) {  ntxw = 1, r = 0; }
      data.resize(_nw); taskv.resize(_nw); 
      long end, t=0, e;
      for(size_t i=0;i<_nw && totalnumtasks>0;++i, totalnumtasks-=t) {
          t       = ntxw + ( (r>1 && (i<r)) ? 1 : 0 );
          e       = start + (t*_chunk - 1) + 1;
          end     = (e<stop) ? e : stop;
          start   = (end-1)+ 1;
      if (totalnumtasks) {
          // try to keep the n. of tasks per worker as smaller as possible
          if (ntxw > 1) data[_nw-1].first += totalnumtasks;
          else { --tt, _chunk*=2; }
          data[_nw-1].second.end = stop;
    for(size_t i=0;i<_nw;++i) {
          printf("W=%ld %ld <%ld,%ld>\n", i, data[i].first, data[i].second.start, data[i].second.end);
          printf("totaltasks=%ld\n", tt);
      return tt;
  inline void fillTask(task_t &task, const int id) {
long start = data[id].second.start;
long end = std::min(start+_chunk, data[id].second.end);
--data[id].first, (data[id].second).start = (end-1) + 1;
task.set(start, end);


  Scheduler(ff_loadbalancer* lb, long start, long stop, long chunk, int nw):
      lb(lb),_start(start),_stop(stop),_chunk(chunk),totaltasks(0),_nw(nw) {
      totaltasks = init_data(start,stop);
  // get the next task if any
  bool nextTask(task_t &task, const int wid) {
      if (data[wid].first) {
          return true;
      // no available task for the current thread
      long maxid = (std::max_element(data.begin(),data.end(),data_cmp) - data.begin());
      if (data[maxid].first > 0)  { fillTask(task,maxid); return true; }
      return false; 
  inline task_t* svc(task_t* t) {
      if (t==nullptr) {
          size_t remaining    = totaltasks;
          for(size_t wid=0;wid<_nw;++wid) {
              if (data[wid].first >0) {
                  long start = data[wid].second.start;
                  long end   = std::min(start+_chunk, data[wid].second.end);
                  taskv[wid].set(start, end);
                  lb->ff_send_out_to(&taskv[wid], (int)wid);
                  --remaining, --data[wid].first;
                  (data[wid].second).start = (end-1)+1;
          return (remaining>0) ? GO_ON : EOS;
      auto wid =  lb->get_channel_id();
      if (--totaltasks <=0) return EOS;
      if (nextTask(*t, (int)wid)) lb->ff_send_out_to(t, int(wid));            
      else  lb->ff_send_out_to(EOS, (int)wid);
      return GO_ON;


  ff_loadbalancer  *lb;
  long              _start,_stop, _chunk;  
  size_t            totaltasks, _nw;
  std::vector<std::pair<long, task_t> >  data;  // internal table of tasks
  std::vector<task_t>                    taskv; // tasks vector, avoiding dynanic memory allocation


template<typename FUNC_t> class Worker: public ff_node_t<task_t> { public:

  Worker(FUNC_t F):F(F) {}
  inline task_t* svc(task_t* task) {
DBG(printf("W(%d) <%ld,%ld>\n", get_my_id(), task->start, task->end));
      return task;
  FUNC_t  F;


void usage(char *argv[]) {

  std::cerr << "use: " << argv[0] << " numworkers chunksize arraysize\n";
  std::cerr << "  example: " << argv[0] << " 2 10 1000\n\n";


int main(int argc, char *argv[]) {

  if (argc<4) {
return -1;
  int  nw    = atoi(argv[1]);
  long chunk = atol(argv[2]);
  long size  = atol(argv[3]);
  if (nw <= 0 || size <= 0) { usage(argv); return -1; }
  if (chunk<=0) chunk = size/nw;
  // create and initialize the array
  std::vector<double> V(size);
  for (size_t i = 0; i < (size_t)size; i++)  V[i] = ( pow(i,3) / (i+1) );
for(size_t i=0;i<V.size();++i)
    printf("%g ", V[i]);
  // this is the function to compute on each single element of the vector V
  auto F = [&V](const long start, const long stop) {
      for(long i=start; i < stop; ++i) {
          size_t k = V[i];
          double r = 1.0;
          for(size_t j=0;j<k;++j) r*=r*sin(r/k); // just some dummy computation
          V[i] = r;
  if (nw == 1) 
  else {
      std::vector<ff_node *> W;
      for(int i=0;i<nw;++i)
          W.push_back(new Worker<decltype(F)>(F));
      ff_farm<> farm(W);
      farm.remove_collector(); // remove default collector
      Scheduler Sched(farm.getlb(),0,size,chunk,nw);
      if (farm.run_and_wait_end() < 0) {
          error("running farm\n");
          return -1;
  std::cout << "Time (ms) = " << ffTime(GET_TIME) << "\n";

#if defined(CHECK_RESULT)

  // summing all elements
  double s=0;
  for (size_t i = 0; i < (size_t)size; i++)  
  std::cout << "Result s=" << s << "\n";


  return 0;


