GNUnet  0.11.x
testbed_api_operations.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2008--2013 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
28 #include "platform.h"
29 #include "testbed_api_operations.h"
30 #include "testbed_api_sd.h"
31 
36 #define ADAPTIVE_QUEUE_DEFAULT_HISTORY 40
37 
42 #define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE 4
43 
47 struct QueueEntry
48 {
52  struct QueueEntry *next;
53 
57  struct QueueEntry *prev;
58 
63 
67  unsigned int nres;
68 };
69 
70 
75 struct OperationQueue;
76 
77 
81 struct TimeSlot
82 {
86  struct TimeSlot *next;
87 
91  struct TimeSlot *prev;
92 
97 
102 
106  struct GNUNET_TIME_Relative tsum;
107 
111  unsigned int nvals;
112 };
113 
114 
119 {
123  struct SDHandle *sd;
124 
129 
134 
140 
144  unsigned int tslots_filled;
145 
149  unsigned int max_active_bound;
150 
154  unsigned int nfailed;
155 };
156 
157 
163 {
169 
174 
180 
185 
191 
196 
203 
208 
213  struct FeedbackCtx *fctx;
214 
219 
223  unsigned int active;
224 
231  unsigned int max_active;
232 
238  unsigned int overload;
239 
243  unsigned int expired;
244 };
245 
246 
251 {
256 
261 
266 
271 
279 };
280 
281 
286 {
291 
296 
301 };
302 
303 
308 {
313 
319 
323  void *cb_cls;
324 
329 
335 
340  unsigned int *nres;
341 
347 
352 
357 
361  struct GNUNET_TIME_Absolute tstart;
362 
366  unsigned int nqueues;
367 
372 
376  int failed;
377 };
378 
382 static struct ReadyQueueEntry *rq_head;
383 
387 static struct ReadyQueueEntry *rq_tail;
388 
392 static struct OperationQueue **expired_opqs;
393 
397 static unsigned int n_expired_opqs;
398 
403 
404 
412 static void
414  struct OperationQueue *queue)
415 {
416  struct FeedbackCtx *fctx = queue->fctx;
417  struct TimeSlot *tslot;
418 
420  tslot = fctx->alloc_head;
421  GNUNET_assert (NULL != tslot);
422  GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot);
424  tslot->op = op;
425 }
426 
427 
435 static void
436 remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
437 {
438  struct OperationQueue *opq;
439  struct QueueEntry *entry;
440 
441  opq = op->queues[index];
442  entry = op->qentries[index];
443  switch (op->state)
444  {
445  case OP_STATE_INIT:
446  GNUNET_assert (0);
447  break;
448 
449  case OP_STATE_WAITING:
450  GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
451  break;
452 
453  case OP_STATE_READY:
454  GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
455  break;
456 
457  case OP_STATE_ACTIVE:
458  GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
459  break;
460 
461  case OP_STATE_INACTIVE:
462  GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
463  break;
464  }
465 }
466 
467 
475 static void
477 {
478  struct QueueEntry *entry;
479  struct OperationQueue *opq;
480  unsigned int cnt;
481  unsigned int s;
482 
483  GNUNET_assert (OP_STATE_INIT != state);
484  GNUNET_assert (NULL != op->queues);
485  GNUNET_assert (NULL != op->nres);
486  GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
487  GNUNET_assert (op->state != state);
488  for (cnt = 0; cnt < op->nqueues; cnt++)
489  {
490  if (OP_STATE_INIT == op->state)
491  {
492  entry = GNUNET_new (struct QueueEntry);
493  entry->op = op;
494  entry->nres = op->nres[cnt];
495  s = cnt;
496  GNUNET_array_append (op->qentries, s, entry);
497  }
498  else
499  {
500  entry = op->qentries[cnt];
501  remove_queue_entry (op, cnt);
502  }
503  opq = op->queues[cnt];
504  switch (state)
505  {
506  case OP_STATE_INIT:
507  GNUNET_assert (0);
508  break;
509 
510  case OP_STATE_WAITING:
512  break;
513 
514  case OP_STATE_READY:
516  break;
517 
518  case OP_STATE_ACTIVE:
520  break;
521 
522  case OP_STATE_INACTIVE:
524  break;
525  }
526  }
527  op->state = state;
528 }
529 
530 
537 static void
539 {
540  GNUNET_assert (NULL != op->rq_entry);
541  GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
542  GNUNET_free (op->rq_entry);
543  op->rq_entry = NULL;
544  if ((NULL == rq_head) && (NULL != process_rq_task_id))
545  {
546  GNUNET_SCHEDULER_cancel (process_rq_task_id);
547  process_rq_task_id = NULL;
548  }
549 }
550 
551 
560 static void
561 process_rq_task (void *cls)
562 {
564  struct OperationQueue *queue;
565  unsigned int cnt;
566 
567  process_rq_task_id = NULL;
568  GNUNET_assert (NULL != rq_head);
569  GNUNET_assert (NULL != (op = rq_head->op));
570  rq_remove (op);
571  if (NULL != rq_head)
572  process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
574  for (cnt = 0; cnt < op->nqueues; cnt++)
575  {
576  queue = op->queues[cnt];
577  if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
578  assign_timeslot (op, queue);
579  }
581  if (NULL != op->start)
582  op->start (op->cb_cls);
583 }
584 
585 
591 static void
593 {
594  struct ReadyQueueEntry *rq_entry;
595 
596  GNUNET_assert (NULL == op->rq_entry);
597  rq_entry = GNUNET_new (struct ReadyQueueEntry);
598  rq_entry->op = op;
599  GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry);
600  op->rq_entry = rq_entry;
601  if (NULL == process_rq_task_id)
602  process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
603 }
604 
605 
613 static int
615 {
616  if ((NULL != opq->wq_head)
617  || (NULL != opq->rq_head)
618  || (NULL != opq->aq_head)
619  || (NULL != opq->nq_head))
620  return GNUNET_NO;
621  return GNUNET_YES;
622 }
623 
624 
641 static int
643  struct QueueEntry *entry,
644  struct GNUNET_TESTBED_Operation ***ops_,
645  unsigned int *n_ops_)
646 {
647  struct QueueEntry **evict_entries;
648  struct GNUNET_TESTBED_Operation **ops;
650  unsigned int n_ops;
651  unsigned int n_evict_entries;
652  unsigned int need;
653  unsigned int max;
654  int deficit;
655  int rval;
656 
657  GNUNET_assert (NULL != (op = entry->op));
658  GNUNET_assert (0 < (need = entry->nres));
659  ops = NULL;
660  n_ops = 0;
661  evict_entries = NULL;
662  n_evict_entries = 0;
663  rval = GNUNET_YES;
665  {
666  GNUNET_assert (NULL != opq->fctx);
667  GNUNET_assert (opq->max_active >= opq->overload);
668  max = opq->max_active - opq->overload;
669  }
670  else
671  max = opq->max_active;
672  if (opq->active > max)
673  {
674  rval = GNUNET_NO;
675  goto ret;
676  }
677  if ((opq->active + need) <= max)
678  goto ret;
679  deficit = need - (max - opq->active);
680  for (entry = opq->nq_head;
681  (0 < deficit) && (NULL != entry);
682  entry = entry->next)
683  {
684  GNUNET_array_append (evict_entries, n_evict_entries, entry);
685  deficit -= entry->nres;
686  }
687  if (0 < deficit)
688  {
689  rval = GNUNET_NO;
690  goto ret;
691  }
692  for (n_ops = 0; n_ops < n_evict_entries;)
693  {
694  op = evict_entries[n_ops]->op;
695  GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
696  }
697 
698 ret:
699  GNUNET_free_non_null (evict_entries);
700  if (NULL != ops_)
701  *ops_ = ops;
702  else
703  GNUNET_free (ops);
704  if (NULL != n_ops_)
705  *n_ops_ = n_ops;
706  return rval;
707 }
708 
709 
719 static void
721  unsigned int *n_old,
722  struct GNUNET_TESTBED_Operation **new,
723  unsigned int n_new)
724 {
725  struct GNUNET_TESTBED_Operation **cur;
726  unsigned int i;
727  unsigned int j;
728  unsigned int n_cur;
729 
730  GNUNET_assert (NULL != old);
731  n_cur = *n_old;
732  cur = *old;
733  for (i = 0; i < n_new; i++)
734  {
735  for (j = 0; j < *n_old; j++)
736  {
737  if (new[i] == cur[j])
738  break;
739  }
740  if (j < *n_old)
741  continue;
742  GNUNET_array_append (cur, n_cur, new[j]);
743  }
744  *old = cur;
745  *n_old = n_cur;
746 }
747 
748 
754 static int
756 {
757  struct GNUNET_TESTBED_Operation **evict_ops;
758  struct GNUNET_TESTBED_Operation **ops;
759  unsigned int n_ops;
760  unsigned int n_evict_ops;
761  unsigned int i;
762 
763  GNUNET_assert (NULL == op->rq_entry);
765  evict_ops = NULL;
766  n_evict_ops = 0;
767  for (i = 0; i < op->nqueues; i++)
768  {
769  ops = NULL;
770  n_ops = 0;
771  if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
772  &ops, &n_ops))
773  {
774  GNUNET_free_non_null (evict_ops);
775  return GNUNET_NO;
776  }
777  if (NULL == ops)
778  continue;
779  merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
780  GNUNET_free (ops);
781  }
782  if (NULL != evict_ops)
783  {
784  for (i = 0; i < n_evict_ops; i++)
785  GNUNET_TESTBED_operation_release_ (evict_ops[i]);
786  GNUNET_free (evict_ops);
787  evict_ops = NULL;
788  /* Evicting the operations should schedule this operation */
790  return GNUNET_YES;
791  }
792  for (i = 0; i < op->nqueues; i++)
793  op->queues[i]->active += op->nres[i];
795  rq_add (op);
796  return GNUNET_YES;
797 }
798 
799 
805 static void
807 {
808  unsigned int i;
809 
811  rq_remove (op);
812  for (i = 0; i < op->nqueues; i++)
813  {
814  GNUNET_assert (op->queues[i]->active >= op->nres[i]);
815  op->queues[i]->active -= op->nres[i];
816  }
818 }
819 
820 
828 static void
830 {
831  struct FeedbackCtx *fctx = queue->fctx;
832  struct TimeSlot *tslot;
834  unsigned int cnt;
835 
836  GNUNET_assert (NULL != fctx);
837  for (cnt = 0; cnt < queue->max_active; cnt++)
838  {
839  tslot = &fctx->tslots_freeptr[cnt];
840  op = tslot->op;
841  if (NULL == op)
842  continue;
844  }
846  fctx->tslots_freeptr = NULL;
847  fctx->alloc_head = NULL;
848  fctx->alloc_tail = NULL;
849  fctx->tslots_filled = 0;
850 }
851 
852 
862 static void
864 {
865  struct FeedbackCtx *fctx = queue->fctx;
866  struct TimeSlot *tslot;
867  unsigned int cnt;
868 
869  cleanup_tslots (queue);
870  n = GNUNET_MIN (n, fctx->max_active_bound);
871  fctx->tslots_freeptr = GNUNET_malloc (n * sizeof(struct TimeSlot));
872  fctx->nfailed = 0;
873  for (cnt = 0; cnt < n; cnt++)
874  {
875  tslot = &fctx->tslots_freeptr[cnt];
876  tslot->queue = queue;
878  tslot);
879  }
881 }
882 
883 
890 static void
892 {
893  struct GNUNET_TIME_Relative avg;
894  struct FeedbackCtx *fctx;
895  struct TimeSlot *tslot;
896  int sd;
897  unsigned int nvals;
898  unsigned int cnt;
899  unsigned int parallelism;
900 
901  avg = GNUNET_TIME_UNIT_ZERO;
902  nvals = 0;
903  fctx = queue->fctx;
904  for (cnt = 0; cnt < queue->max_active; cnt++)
905  {
906  tslot = &fctx->tslots_freeptr[cnt];
907  avg = GNUNET_TIME_relative_add (avg, tslot->tsum);
908  nvals += tslot->nvals;
909  }
910  GNUNET_assert (nvals >= queue->max_active);
911  GNUNET_assert (fctx->nfailed <= nvals);
912  nvals -= fctx->nfailed;
913  if (0 == nvals)
914  {
915  if (1 == queue->max_active)
917  else
918  adaptive_queue_set_max_active (queue, queue->max_active / 2);
919  return;
920  }
921  avg = GNUNET_TIME_relative_divide (avg, nvals);
922  GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
923  if (GNUNET_SYSERR ==
925  (unsigned int) avg.rel_value_us,
926  &sd))
927  {
928  adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
929  return;
930  }
931 
932  parallelism = 0;
933  if (-1 == sd)
934  parallelism = queue->max_active + 1;
935  if (sd <= -2)
936  parallelism = queue->max_active * 2;
937  if (1 == sd)
938  parallelism = queue->max_active - 1;
939  if (2 <= sd)
940  parallelism = queue->max_active / 2;
941  parallelism = GNUNET_MAX (parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
942  adaptive_queue_set_max_active (queue, parallelism);
943 
944 #if 0
945  /* old algorithm */
946  if (sd < 0)
947  sd = 0;
948  GNUNET_assert (0 <= sd);
949  // GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
950  if (0 == sd)
951  {
952  adaptive_queue_set_max_active (queue, queue->max_active * 2);
953  return;
954  }
955  if (1 == sd)
956  {
957  adaptive_queue_set_max_active (queue, queue->max_active + 1);
958  return;
959  }
960  if (1 == queue->max_active)
961  {
963  return;
964  }
965  if (2 == sd)
966  {
967  adaptive_queue_set_max_active (queue, queue->max_active - 1);
968  return;
969  }
970  adaptive_queue_set_max_active (queue, queue->max_active / 2);
971 #endif
972 }
973 
974 
982 static void
984 {
985  struct OperationQueue *queue;
986  struct GNUNET_TIME_Relative t;
987  struct TimeSlot *tslot;
988  struct FeedbackCtx *fctx;
989  unsigned int i;
990 
992  while (NULL != (tslot = op->tslots_head)) /* update time slots */
993  {
994  queue = tslot->queue;
995  fctx = queue->fctx;
997  tslot->op = NULL;
999  tslot);
1000  if (op->failed)
1001  {
1002  fctx->nfailed++;
1003  for (i = 0; i < op->nqueues; i++)
1004  if (queue == op->queues[i])
1005  break;
1006  GNUNET_assert (i != op->nqueues);
1007  op->queues[i]->overload += op->nres[i];
1008  }
1009  tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
1010  if (0 != tslot->nvals++)
1011  continue;
1012  fctx->tslots_filled++;
1013  if (queue->max_active == fctx->tslots_filled)
1014  adapt_parallelism (queue);
1015  }
1016 }
1017 
1018 
1027 struct GNUNET_TESTBED_Operation *
1030 {
1031  struct GNUNET_TESTBED_Operation *op;
1032 
1033  op = GNUNET_new (struct GNUNET_TESTBED_Operation);
1034  op->start = start;
1035  op->state = OP_STATE_INIT;
1036  op->release = release;
1037  op->cb_cls = cls;
1038  return op;
1039 }
1040 
1041 
1050 struct OperationQueue *
1052  unsigned int max_active)
1053 {
1054  struct OperationQueue *queue;
1055  struct FeedbackCtx *fctx;
1056 
1057  queue = GNUNET_new (struct OperationQueue);
1058  queue->type = type;
1059  if (OPERATION_QUEUE_TYPE_FIXED == type)
1060  {
1061  queue->max_active = max_active;
1062  }
1063  else
1064  {
1065  fctx = GNUNET_new (struct FeedbackCtx);
1066  fctx->max_active_bound = max_active;
1068  queue->fctx = fctx;
1070  }
1071  return queue;
1072 }
1073 
1074 
1080 static void
1082 {
1083  struct FeedbackCtx *fctx;
1084 
1085  if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
1086  {
1087  cleanup_tslots (queue);
1088  fctx = queue->fctx;
1090  GNUNET_free (fctx);
1091  }
1092  GNUNET_free (queue);
1093 }
1094 
1095 
1103 void
1105 {
1106  if (GNUNET_YES != is_queue_empty (queue))
1107  {
1108  GNUNET_assert (0 == queue->expired); /* Are you calling twice on same queue? */
1109  queue->expired = 1;
1110  GNUNET_array_append (expired_opqs, n_expired_opqs, queue);
1111  return;
1112  }
1113  queue_destroy (queue);
1114 }
1115 
1116 
1124 int
1126 {
1127  if (GNUNET_NO == is_queue_empty (queue))
1128  return GNUNET_NO;
1130  return GNUNET_YES;
1131 }
1132 
1133 
1140 static void
1142 {
1143  struct QueueEntry *entry;
1144  struct QueueEntry *entry2;
1145 
1146  entry = opq->wq_head;
1147  while (NULL != entry)
1148  {
1149  entry2 = entry->next;
1150  if (GNUNET_NO == check_readiness (entry->op))
1151  break;
1152  entry = entry2;
1153  }
1154 }
1155 
1156 
1165 void
1167  unsigned int max_active)
1168 {
1169  struct QueueEntry *entry;
1170 
1171  queue->max_active = max_active;
1172  queue->overload = 0;
1173  while ((queue->active > queue->max_active)
1174  && (NULL != (entry = queue->rq_head)))
1175  defer (entry->op);
1176  recheck_waiting (queue);
1177 }
1178 
1179 
1191 void
1193  struct GNUNET_TESTBED_Operation *op,
1194  unsigned int nres)
1195 {
1196  unsigned int qsize;
1197 
1198  GNUNET_assert (0 < nres);
1199  qsize = op->nqueues;
1200  GNUNET_array_append (op->queues, op->nqueues, queue);
1201  GNUNET_array_append (op->nres, qsize, nres);
1202  GNUNET_assert (qsize == op->nqueues);
1203 }
1204 
1205 
1217 void
1219  struct GNUNET_TESTBED_Operation *op)
1220 {
1221  return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1);
1222 }
1223 
1224 
1234 void
1236 {
1237  GNUNET_assert (NULL == op->rq_entry);
1239  (void) check_readiness (op);
1240 }
1241 
1242 
1252 void
1254 {
1255  struct OperationQueue **queues;
1256  size_t ms;
1257  unsigned int nqueues;
1258  unsigned int i;
1259 
1262  nqueues = op->nqueues;
1263  ms = sizeof(struct OperationQueue *) * nqueues;
1264  queues = GNUNET_malloc (ms);
1265  /* Cloning is needed as the operation be released by waiting operations and
1266  hence its nqueues memory ptr will be freed */
1267  GNUNET_memcpy (queues, op->queues, ms);
1268  for (i = 0; i < nqueues; i++)
1269  recheck_waiting (queues[i]);
1270  GNUNET_free (queues);
1271 }
1272 
1273 
1281 void
1283 {
1286 }
1287 
1288 
1295 void
1297 {
1298  struct QueueEntry *entry;
1299  struct OperationQueue *opq;
1300  unsigned int i;
1301 
1302  if (OP_STATE_INIT == op->state)
1303  {
1304  GNUNET_free (op);
1305  return;
1306  }
1307  if (OP_STATE_READY == op->state)
1308  rq_remove (op);
1309  if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
1311  if (OP_STATE_ACTIVE == op->state)
1312  update_tslots (op);
1313  GNUNET_assert (NULL != op->queues);
1314  GNUNET_assert (NULL != op->qentries);
1315  for (i = 0; i < op->nqueues; i++)
1316  {
1317  entry = op->qentries[i];
1318  remove_queue_entry (op, i);
1319  opq = op->queues[i];
1320  switch (op->state)
1321  {
1322  case OP_STATE_INIT:
1323  case OP_STATE_INACTIVE:
1324  GNUNET_assert (0);
1325  break;
1326 
1327  case OP_STATE_WAITING:
1328  break;
1329 
1330  case OP_STATE_ACTIVE:
1331  case OP_STATE_READY:
1332  GNUNET_assert (0 != opq->active);
1333  GNUNET_assert (opq->active >= entry->nres);
1334  opq->active -= entry->nres;
1335  recheck_waiting (opq);
1336  break;
1337  }
1338  GNUNET_free (entry);
1339  }
1341  GNUNET_free (op->queues);
1342  GNUNET_free (op->nres);
1343  if (NULL != op->release)
1344  op->release (op->cb_cls);
1345  GNUNET_free (op);
1346 }
1347 
1348 
1354 void
1356 {
1357  op->failed = GNUNET_YES;
1358 }
1359 
1360 
1365 void __attribute__ ((destructor))
1366 GNUNET_TESTBED_operations_fini ()
1367 {
1368  struct OperationQueue *queue;
1369  unsigned int i;
1370  int warn = 0;
1371 
1372  for (i = 0; i < n_expired_opqs; i++)
1373  {
1374  queue = expired_opqs[i];
1375  if (GNUNET_NO == is_queue_empty (queue))
1376  warn = 1;
1377  queue_destroy (queue);
1378  }
1379  GNUNET_free_non_null (expired_opqs);
1380  n_expired_opqs = 0;
1381  if (warn)
1383  "Be disciplined. Some operations were not marked as done.\n");
1384 }
1385 
1386 
1387 /* end of testbed_api_operations.c */
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
static void process_rq_task(void *cls)
Processes the ready queue by calling the operation start callback of the operation at the head...
static unsigned int parallelism
State of an evaluate operation with another peer.
unsigned int nfailed
Number of operations that have failed.
void GNUNET_TESTBED_operation_mark_failed(struct GNUNET_TESTBED_Operation *op)
Marks an operation as failed.
void * cb_cls
Closure for callbacks.
static int is_queue_empty(struct OperationQueue *opq)
Checks if the given operation queue is empty or not.
struct GNUNET_TIME_Relative tsum
Accumulated time.
static struct OperationQueue ** expired_opqs
Array of operation queues which are to be destroyed.
internal API to access the &#39;operations&#39; subsystem
uint64_t rel_value_us
The actual value.
void GNUNET_TESTBED_operation_queue_insert2_(struct OperationQueue *queue, struct GNUNET_TESTBED_Operation *op, unsigned int nres)
Add an operation to a queue.
unsigned int * nres
Array of number of resources an operation need from each queue.
struct TimeSlot * next
DLL next pointer.
struct QueueEntry * aq_head
DLL head for the active queue.
struct ReadyQueueEntry * prev
prev ptr for DLL
unsigned int nres
How many units of resources does the operation need.
int GNUNET_TESTBED_SD_deviation_factor_(struct SDHandle *h, unsigned int amount, int *factor)
Calculates the factor by which the given amount differs.
Opaque handle for calculating SD.
struct QueueEntry * rq_tail
DLL tail for the ready queue.
static void adaptive_queue_set_max_active(struct OperationQueue *queue, unsigned int n)
Cleansup the existing timing slots and sets new timing slots in the given queue to accommodate given ...
struct GNUNET_TESTBED_Operation * op
The operation associated with this entry.
static int start
Set if we are to start default services (including ARM).
Definition: gnunet-arm.c:39
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
The operation is inactive.
struct TimeSlot * tslots_head
Head pointer for DLL of tslots allocated to this operation.
struct GNUNET_TESTBED_Operation * GNUNET_TESTBED_operation_create_(void *cls, OperationStart start, OperationRelease release)
Create an &#39;operation&#39; to be performed.
void GNUNET_TESTBED_operation_release_(struct GNUNET_TESTBED_Operation *op)
An operation is &#39;done&#39; (was cancelled or finished); remove it from the queues and release associated ...
static void queue_destroy(struct OperationQueue *queue)
Cleanup the given operation queue.
Operation queue which permits a fixed maximum number of operations to be active at any time...
static void cleanup_tslots(struct OperationQueue *queue)
Cleanups the array of timeslots of an operation queue.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
OperationRelease release
Function to call to clean up after the operation (which may or may not have been started yet)...
struct QueueEntry ** qentries
Array of operation queue entries corresponding to this operation in operation queues for this operati...
static int ret
Return value of the commandline.
Definition: gnunet-abd.c:81
struct QueueEntry * nq_head
DLL head for the inactive queue.
static void change_state(struct GNUNET_TESTBED_Operation *op, enum OperationState state)
Changes the state of the operation while moving its associated queue entries in the operation&#39;s opera...
#define GNUNET_NO
Definition: gnunet_common.h:78
void(* OperationRelease)(void *cls)
Function to call to cancel an operation (release all associated resources).
The operation is just created and is in initial state.
static struct ReadyQueueEntry * rq_head
DLL head for the ready queue.
struct GNUNET_SCHEDULER_Task * process_rq_task_id
The id of the task to process the ready queue.
#define GNUNET_free_non_null(ptr)
Free the memory pointed to by ptr if ptr is not NULL.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
static void update_tslots(struct GNUNET_TESTBED_Operation *op)
update tslots with the operation&#39;s completion time.
void GNUNET_TESTBED_operation_queue_insert_(struct OperationQueue *queue, struct GNUNET_TESTBED_Operation *op)
Add an operation to a queue.
struct ReadyQueueEntry * next
next ptr for DLL
struct OperationQueue * GNUNET_TESTBED_operation_queue_create_(enum OperationQueueType type, unsigned int max_active)
Create an operation queue.
unsigned int nqueues
Number of queues in the operation queues array.
Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE.
Opaque handle to an abstract operation to be executed by the testing framework.
static int check_readiness(struct GNUNET_TESTBED_Operation *op)
Checks for the readiness of an operation and schedules a operation start task.
static void defer(struct GNUNET_TESTBED_Operation *op)
Defers a ready to be executed operation back to waiting.
struct QueueEntry * next
The next DLL pointer.
enum State state
current state of profiling
unsigned int max_active_bound
Bound on the maximum number of operations which can be active.
struct SDHandle * GNUNET_TESTBED_SD_init_(unsigned int max_cnt)
Initialize standard deviation calculation handle.
void(* OperationStart)(void *cls)
Function to call to start an operation once all queues the operation is part of declare that the oper...
struct QueueEntry * rq_head
DLL head for the ready queue.
The operation is currently waiting for resources.
#define GNUNET_MAX(a, b)
Definition: gnunet_common.h:82
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_now(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run as soon as possible.
Definition: scheduler.c:1280
struct QueueEntry * wq_tail
DLL tail for the wait queue.
#define GNUNET_MIN(a, b)
Definition: gnunet_common.h:80
struct Queue * queue
Queue this entry is queued with.
struct GNUNET_TIME_Absolute tstart
The time at which the operation is started.
static void recheck_waiting(struct OperationQueue *opq)
Rechecks if any of the operations in the given operation queue&#39;s waiting list can be made active...
static void rq_remove(struct GNUNET_TESTBED_Operation *op)
Removes an operation from the ready queue.
int GNUNET_TESTBED_operation_queue_destroy_empty_(struct OperationQueue *queue)
Destroys the operation queue if it is empty.
static int decide_capacity(struct OperationQueue *opq, struct QueueEntry *entry, struct GNUNET_TESTBED_Operation ***ops_, unsigned int *n_ops_)
Checks if the given operation queue has enough resources to provide for the operation of the given qu...
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_get(void)
Get the current time.
Definition: time.c:118
struct TimeSlot * alloc_head
Head for DLL of time slots which are free to be allocated to operations.
void GNUNET_TESTBED_operation_activate_(struct GNUNET_TESTBED_Operation *op)
Marks and inactive operation as active.
unsigned int overload
The number of resources occupied by failed operations in the current shot.
void GNUNET_TESTBED_operation_queue_destroy_(struct OperationQueue *queue)
Destroys an operation queue.
struct SDHandle * sd
Handle for calculating standard deviation.
struct QueueEntry * prev
The prev DLL pointer.
#define GNUNET_SYSERR
Definition: gnunet_common.h:76
OperationQueueType
The type of operation queue.
The operation is ready to be started.
struct ReadyQueueEntry * rq_entry
Entry corresponding to this operation in ready queue.
#define GNUNET_CONTAINER_DLL_insert_tail(head, tail, element)
Insert an element at the tail of a DLL.
struct TimeSlot * tslots_tail
Tail pointer for DLL of tslots allocated to this operation.
enum OperationQueueType type
The type of this opeartion queue.
static void remove_queue_entry(struct GNUNET_TESTBED_Operation *op, unsigned int index)
Removes a queue entry of an operation from one of the operation queues&#39; lists depending on the state ...
#define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE
The number of parallel opeartions we start with by default for adaptive queues.
void GNUNET_TESTBED_operation_queue_reset_max_active_(struct OperationQueue *queue, unsigned int max_active)
Function to reset the maximum number of operations in the given queue.
#define GNUNET_TIME_UNIT_ZERO
Relative time zero.
An entry in the operation queue.
unsigned int active
Number of operations that are currently active in this queue.
functions to calculate standard deviation
#define GNUNET_array_append(arr, size, element)
Append an element to a list (growing the list by one).
enum OperationState state
The state of the operation.
struct GNUNET_TIME_Relative GNUNET_TIME_relative_add(struct GNUNET_TIME_Relative a1, struct GNUNET_TIME_Relative a2)
Add relative times together.
Definition: time.c:579
Queue of operations where we can only support a certain number of concurrent operations of a particul...
struct TimeSlot * alloc_tail
Tail for DLL of time slots which are free to be allocated to operations.
static void rq_add(struct GNUNET_TESTBED_Operation *op)
Adds the operation to the ready queue and starts the &#39;process_rq_task&#39;.
void GNUNET_TESTBED_operation_begin_wait_(struct GNUNET_TESTBED_Operation *op)
Marks the given operation as waiting on the queues.
struct TimeSlot * tslots_freeptr
Pointer to the chunk of time slots.
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_duration(struct GNUNET_TIME_Absolute whence)
Get the duration of an operation as the difference of the current time and the given start time "henc...
Definition: time.c:375
An entry in the ready queue (implemented as DLL)
struct OperationQueue ** queues
Array of operation queues this Operation belongs to.
void __attribute__((destructor))
Cleanup expired operation queues.
int failed
Is this a failed operation?
static struct ReadyQueueEntry * rq_tail
DLL tail for the ready queue.
struct QueueEntry * aq_tail
DLL tail for the active queue.
#define GNUNET_log(kind,...)
Entry in list of pending tasks.
Definition: scheduler.c:134
struct QueueEntry * nq_tail
DLL tail for the inactive queue.
#define ADAPTIVE_QUEUE_DEFAULT_HISTORY
The number of readings containing past operation&#39;s timing information that we keep track of for adapt...
static void adapt_parallelism(struct OperationQueue *queue)
Adapts parallelism in an adaptive queue by using the statistical data from the feedback context...
void GNUNET_TESTBED_SD_add_data_(struct SDHandle *h, unsigned int amount)
Add a reading to SD.
static unsigned int n_expired_opqs
Number of expired operation queues in the above array.
struct TimeSlot * prev
DLL prev pointer.
void GNUNET_TESTBED_SD_destroy_(struct SDHandle *h)
Frees the memory allocated to the SD handle.
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
struct GNUNET_TIME_Relative GNUNET_TIME_relative_divide(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Divide relative time by a given factor.
Definition: time.c:527
A slot to record time taken by an operation.
unsigned int max_active
Max number of operations which can be active at any time in this queue.
Time for absolute times used by GNUnet, in microseconds.
#define GNUNET_YES
Definition: gnunet_common.h:77
static void merge_ops(struct GNUNET_TESTBED_Operation ***old, unsigned int *n_old, struct GNUNET_TESTBED_Operation **new, unsigned int n_new)
Merges an array of operations into another, eliminating duplicates.
The operation has started and is active.
struct GNUNET_TESTBED_Operation * op
The operation to which this timeslot is currently allocated to.
void GNUNET_TESTBED_operation_inactivate_(struct GNUNET_TESTBED_Operation *op)
Marks an active operation as inactive - the operation will be kept in a ready-to-be-released state an...
unsigned int tslots_filled
Number of time slots filled so far.
struct OperationQueue * queue
This operation queue to which this time slot belongs to.
OperationStart start
Function to call when we have the resources to begin the operation.
struct FeedbackCtx * fctx
Feedback context; only relevant for adaptive operation queues.
static void assign_timeslot(struct GNUNET_TESTBED_Operation *op, struct OperationQueue *queue)
Assigns the given operation a time slot from the given operation queue.
Operation queue which adapts the number of operations to be active based on the operation completion ...
struct QueueEntry * wq_head
DLL head for the wait queue.
#define GNUNET_malloc(size)
Wrapper around malloc.
struct GNUNET_TESTBED_Operation * op
The operation this entry holds.
#define GNUNET_free(ptr)
Wrapper around free.
unsigned int nvals
Number of timing values accumulated.
Time for relative time used by GNUnet, in microseconds.
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:966
unsigned int expired
Is this queue marked for expiry?