GNUnet  0.10.x
testbed_api_operations.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2008--2013 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
28 #include "platform.h"
29 #include "testbed_api_operations.h"
30 #include "testbed_api_sd.h"
31 
36 #define ADAPTIVE_QUEUE_DEFAULT_HISTORY 40
37 
42 #define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE 4
43 
47 struct QueueEntry {
51  struct QueueEntry *next;
52 
56  struct QueueEntry *prev;
57 
62 
66  unsigned int nres;
67 };
68 
69 
74 struct OperationQueue;
75 
76 
80 struct TimeSlot {
84  struct TimeSlot *next;
85 
89  struct TimeSlot *prev;
90 
95 
100 
104  struct GNUNET_TIME_Relative tsum;
105 
109  unsigned int nvals;
110 };
111 
112 
116 struct FeedbackCtx {
120  struct SDHandle *sd;
121 
126 
131 
137 
141  unsigned int tslots_filled;
142 
146  unsigned int max_active_bound;
147 
151  unsigned int nfailed;
152 };
153 
154 
165 
170 
176 
181 
187 
192 
199 
204 
209  struct FeedbackCtx *fctx;
210 
215 
219  unsigned int active;
220 
227  unsigned int max_active;
228 
234  unsigned int overload;
235 
239  unsigned int expired;
240 };
241 
242 
251 
256 
261 
266 
274 };
275 
276 
285 
290 
295 };
296 
297 
306 
312 
316  void *cb_cls;
317 
322 
328 
333  unsigned int *nres;
334 
340 
345 
350 
354  struct GNUNET_TIME_Absolute tstart;
355 
359  unsigned int nqueues;
360 
365 
369  int failed;
370 };
371 
375 static struct ReadyQueueEntry *rq_head;
376 
380 static struct ReadyQueueEntry *rq_tail;
381 
385 static struct OperationQueue **expired_opqs;
386 
390 static unsigned int n_expired_opqs;
391 
396 
397 
405 static void
407  struct OperationQueue *queue)
408 {
409  struct FeedbackCtx *fctx = queue->fctx;
410  struct TimeSlot *tslot;
411 
413  tslot = fctx->alloc_head;
414  GNUNET_assert(NULL != tslot);
417  tslot->op = op;
418 }
419 
420 
428 static void
429 remove_queue_entry(struct GNUNET_TESTBED_Operation *op, unsigned int index)
430 {
431  struct OperationQueue *opq;
432  struct QueueEntry *entry;
433 
434  opq = op->queues[index];
435  entry = op->qentries[index];
436  switch (op->state)
437  {
438  case OP_STATE_INIT:
439  GNUNET_assert(0);
440  break;
441 
442  case OP_STATE_WAITING:
443  GNUNET_CONTAINER_DLL_remove(opq->wq_head, opq->wq_tail, entry);
444  break;
445 
446  case OP_STATE_READY:
447  GNUNET_CONTAINER_DLL_remove(opq->rq_head, opq->rq_tail, entry);
448  break;
449 
450  case OP_STATE_ACTIVE:
451  GNUNET_CONTAINER_DLL_remove(opq->aq_head, opq->aq_tail, entry);
452  break;
453 
454  case OP_STATE_INACTIVE:
455  GNUNET_CONTAINER_DLL_remove(opq->nq_head, opq->nq_tail, entry);
456  break;
457  }
458 }
459 
460 
468 static void
470 {
471  struct QueueEntry *entry;
472  struct OperationQueue *opq;
473  unsigned int cnt;
474  unsigned int s;
475 
476  GNUNET_assert(OP_STATE_INIT != state);
477  GNUNET_assert(NULL != op->queues);
478  GNUNET_assert(NULL != op->nres);
479  GNUNET_assert((OP_STATE_INIT == op->state) || (NULL != op->qentries));
480  GNUNET_assert(op->state != state);
481  for (cnt = 0; cnt < op->nqueues; cnt++)
482  {
483  if (OP_STATE_INIT == op->state)
484  {
485  entry = GNUNET_new(struct QueueEntry);
486  entry->op = op;
487  entry->nres = op->nres[cnt];
488  s = cnt;
489  GNUNET_array_append(op->qentries, s, entry);
490  }
491  else
492  {
493  entry = op->qentries[cnt];
494  remove_queue_entry(op, cnt);
495  }
496  opq = op->queues[cnt];
497  switch (state)
498  {
499  case OP_STATE_INIT:
500  GNUNET_assert(0);
501  break;
502 
503  case OP_STATE_WAITING:
505  break;
506 
507  case OP_STATE_READY:
509  break;
510 
511  case OP_STATE_ACTIVE:
513  break;
514 
515  case OP_STATE_INACTIVE:
517  break;
518  }
519  }
520  op->state = state;
521 }
522 
523 
530 static void
532 {
533  GNUNET_assert(NULL != op->rq_entry);
534  GNUNET_CONTAINER_DLL_remove(rq_head, rq_tail, op->rq_entry);
535  GNUNET_free(op->rq_entry);
536  op->rq_entry = NULL;
537  if ((NULL == rq_head) && (NULL != process_rq_task_id))
538  {
539  GNUNET_SCHEDULER_cancel(process_rq_task_id);
540  process_rq_task_id = NULL;
541  }
542 }
543 
544 
553 static void
554 process_rq_task(void *cls)
555 {
557  struct OperationQueue *queue;
558  unsigned int cnt;
559 
560  process_rq_task_id = NULL;
561  GNUNET_assert(NULL != rq_head);
562  GNUNET_assert(NULL != (op = rq_head->op));
563  rq_remove(op);
564  if (NULL != rq_head)
565  process_rq_task_id = GNUNET_SCHEDULER_add_now(&process_rq_task, NULL);
567  for (cnt = 0; cnt < op->nqueues; cnt++)
568  {
569  queue = op->queues[cnt];
570  if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
571  assign_timeslot(op, queue);
572  }
574  if (NULL != op->start)
575  op->start(op->cb_cls);
576 }
577 
578 
584 static void
586 {
587  struct ReadyQueueEntry *rq_entry;
588 
589  GNUNET_assert(NULL == op->rq_entry);
590  rq_entry = GNUNET_new(struct ReadyQueueEntry);
591  rq_entry->op = op;
592  GNUNET_CONTAINER_DLL_insert_tail(rq_head, rq_tail, rq_entry);
593  op->rq_entry = rq_entry;
594  if (NULL == process_rq_task_id)
595  process_rq_task_id = GNUNET_SCHEDULER_add_now(&process_rq_task, NULL);
596 }
597 
598 
606 static int
608 {
609  if ((NULL != opq->wq_head)
610  || (NULL != opq->rq_head)
611  || (NULL != opq->aq_head)
612  || (NULL != opq->nq_head))
613  return GNUNET_NO;
614  return GNUNET_YES;
615 }
616 
617 
634 static int
636  struct QueueEntry *entry,
637  struct GNUNET_TESTBED_Operation ***ops_,
638  unsigned int *n_ops_)
639 {
640  struct QueueEntry **evict_entries;
641  struct GNUNET_TESTBED_Operation **ops;
643  unsigned int n_ops;
644  unsigned int n_evict_entries;
645  unsigned int need;
646  unsigned int max;
647  int deficit;
648  int rval;
649 
650  GNUNET_assert(NULL != (op = entry->op));
651  GNUNET_assert(0 < (need = entry->nres));
652  ops = NULL;
653  n_ops = 0;
654  evict_entries = NULL;
655  n_evict_entries = 0;
656  rval = GNUNET_YES;
658  {
659  GNUNET_assert(NULL != opq->fctx);
660  GNUNET_assert(opq->max_active >= opq->overload);
661  max = opq->max_active - opq->overload;
662  }
663  else
664  max = opq->max_active;
665  if (opq->active > max)
666  {
667  rval = GNUNET_NO;
668  goto ret;
669  }
670  if ((opq->active + need) <= max)
671  goto ret;
672  deficit = need - (max - opq->active);
673  for (entry = opq->nq_head;
674  (0 < deficit) && (NULL != entry);
675  entry = entry->next)
676  {
677  GNUNET_array_append(evict_entries, n_evict_entries, entry);
678  deficit -= entry->nres;
679  }
680  if (0 < deficit)
681  {
682  rval = GNUNET_NO;
683  goto ret;
684  }
685  for (n_ops = 0; n_ops < n_evict_entries;)
686  {
687  op = evict_entries[n_ops]->op;
688  GNUNET_array_append(ops, n_ops, op); /* increments n-ops */
689  }
690 
691 ret:
692  GNUNET_free_non_null(evict_entries);
693  if (NULL != ops_)
694  *ops_ = ops;
695  else
696  GNUNET_free(ops);
697  if (NULL != n_ops_)
698  *n_ops_ = n_ops;
699  return rval;
700 }
701 
702 
712 static void
714  unsigned int *n_old,
715  struct GNUNET_TESTBED_Operation **new,
716  unsigned int n_new)
717 {
718  struct GNUNET_TESTBED_Operation **cur;
719  unsigned int i;
720  unsigned int j;
721  unsigned int n_cur;
722 
723  GNUNET_assert(NULL != old);
724  n_cur = *n_old;
725  cur = *old;
726  for (i = 0; i < n_new; i++)
727  {
728  for (j = 0; j < *n_old; j++)
729  {
730  if (new[i] == cur[j])
731  break;
732  }
733  if (j < *n_old)
734  continue;
735  GNUNET_array_append(cur, n_cur, new[j]);
736  }
737  *old = cur;
738  *n_old = n_cur;
739 }
740 
741 
742 
748 static int
750 {
751  struct GNUNET_TESTBED_Operation **evict_ops;
752  struct GNUNET_TESTBED_Operation **ops;
753  unsigned int n_ops;
754  unsigned int n_evict_ops;
755  unsigned int i;
756 
757  GNUNET_assert(NULL == op->rq_entry);
759  evict_ops = NULL;
760  n_evict_ops = 0;
761  for (i = 0; i < op->nqueues; i++)
762  {
763  ops = NULL;
764  n_ops = 0;
765  if (GNUNET_NO == decide_capacity(op->queues[i], op->qentries[i],
766  &ops, &n_ops))
767  {
768  GNUNET_free_non_null(evict_ops);
769  return GNUNET_NO;
770  }
771  if (NULL == ops)
772  continue;
773  merge_ops(&evict_ops, &n_evict_ops, ops, n_ops);
774  GNUNET_free(ops);
775  }
776  if (NULL != evict_ops)
777  {
778  for (i = 0; i < n_evict_ops; i++)
779  GNUNET_TESTBED_operation_release_(evict_ops[i]);
780  GNUNET_free(evict_ops);
781  evict_ops = NULL;
782  /* Evicting the operations should schedule this operation */
784  return GNUNET_YES;
785  }
786  for (i = 0; i < op->nqueues; i++)
787  op->queues[i]->active += op->nres[i];
789  rq_add(op);
790  return GNUNET_YES;
791 }
792 
793 
799 static void
801 {
802  unsigned int i;
803 
805  rq_remove(op);
806  for (i = 0; i < op->nqueues; i++)
807  {
808  GNUNET_assert(op->queues[i]->active >= op->nres[i]);
809  op->queues[i]->active -= op->nres[i];
810  }
812 }
813 
814 
822 static void
824 {
825  struct FeedbackCtx *fctx = queue->fctx;
826  struct TimeSlot *tslot;
828  unsigned int cnt;
829 
830  GNUNET_assert(NULL != fctx);
831  for (cnt = 0; cnt < queue->max_active; cnt++)
832  {
833  tslot = &fctx->tslots_freeptr[cnt];
834  op = tslot->op;
835  if (NULL == op)
836  continue;
838  }
840  fctx->tslots_freeptr = NULL;
841  fctx->alloc_head = NULL;
842  fctx->alloc_tail = NULL;
843  fctx->tslots_filled = 0;
844 }
845 
846 
856 static void
858 {
859  struct FeedbackCtx *fctx = queue->fctx;
860  struct TimeSlot *tslot;
861  unsigned int cnt;
862 
863  cleanup_tslots(queue);
864  n = GNUNET_MIN(n, fctx->max_active_bound);
865  fctx->tslots_freeptr = GNUNET_malloc(n * sizeof(struct TimeSlot));
866  fctx->nfailed = 0;
867  for (cnt = 0; cnt < n; cnt++)
868  {
869  tslot = &fctx->tslots_freeptr[cnt];
870  tslot->queue = queue;
872  }
874 }
875 
876 
883 static void
885 {
886  struct GNUNET_TIME_Relative avg;
887  struct FeedbackCtx *fctx;
888  struct TimeSlot *tslot;
889  int sd;
890  unsigned int nvals;
891  unsigned int cnt;
892  unsigned int parallelism;
893 
894  avg = GNUNET_TIME_UNIT_ZERO;
895  nvals = 0;
896  fctx = queue->fctx;
897  for (cnt = 0; cnt < queue->max_active; cnt++)
898  {
899  tslot = &fctx->tslots_freeptr[cnt];
900  avg = GNUNET_TIME_relative_add(avg, tslot->tsum);
901  nvals += tslot->nvals;
902  }
903  GNUNET_assert(nvals >= queue->max_active);
904  GNUNET_assert(fctx->nfailed <= nvals);
905  nvals -= fctx->nfailed;
906  if (0 == nvals)
907  {
908  if (1 == queue->max_active)
910  else
911  adaptive_queue_set_max_active(queue, queue->max_active / 2);
912  return;
913  }
914  avg = GNUNET_TIME_relative_divide(avg, nvals);
915  GNUNET_TESTBED_SD_add_data_(fctx->sd, (unsigned int)avg.rel_value_us);
916  if (GNUNET_SYSERR ==
918  (unsigned int)avg.rel_value_us,
919  &sd))
920  {
921  adaptive_queue_set_max_active(queue, queue->max_active); /* no change */
922  return;
923  }
924 
925  parallelism = 0;
926  if (-1 == sd)
927  parallelism = queue->max_active + 1;
928  if (sd <= -2)
929  parallelism = queue->max_active * 2;
930  if (1 == sd)
931  parallelism = queue->max_active - 1;
932  if (2 <= sd)
933  parallelism = queue->max_active / 2;
934  parallelism = GNUNET_MAX(parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
935  adaptive_queue_set_max_active(queue, parallelism);
936 
937 #if 0
938  /* old algorithm */
939  if (sd < 0)
940  sd = 0;
941  GNUNET_assert(0 <= sd);
942  //GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
943  if (0 == sd)
944  {
945  adaptive_queue_set_max_active(queue, queue->max_active * 2);
946  return;
947  }
948  if (1 == sd)
949  {
950  adaptive_queue_set_max_active(queue, queue->max_active + 1);
951  return;
952  }
953  if (1 == queue->max_active)
954  {
956  return;
957  }
958  if (2 == sd)
959  {
960  adaptive_queue_set_max_active(queue, queue->max_active - 1);
961  return;
962  }
963  adaptive_queue_set_max_active(queue, queue->max_active / 2);
964 #endif
965 }
966 
967 
975 static void
977 {
978  struct OperationQueue *queue;
979  struct GNUNET_TIME_Relative t;
980  struct TimeSlot *tslot;
981  struct FeedbackCtx *fctx;
982  unsigned int i;
983 
985  while (NULL != (tslot = op->tslots_head)) /* update time slots */
986  {
987  queue = tslot->queue;
988  fctx = queue->fctx;
990  tslot->op = NULL;
992  tslot);
993  if (op->failed)
994  {
995  fctx->nfailed++;
996  for (i = 0; i < op->nqueues; i++)
997  if (queue == op->queues[i])
998  break;
999  GNUNET_assert(i != op->nqueues);
1000  op->queues[i]->overload += op->nres[i];
1001  }
1002  tslot->tsum = GNUNET_TIME_relative_add(tslot->tsum, t);
1003  if (0 != tslot->nvals++)
1004  continue;
1005  fctx->tslots_filled++;
1006  if (queue->max_active == fctx->tslots_filled)
1007  adapt_parallelism(queue);
1008  }
1009 }
1010 
1011 
1020 struct GNUNET_TESTBED_Operation *
1023 {
1024  struct GNUNET_TESTBED_Operation *op;
1025 
1026  op = GNUNET_new(struct GNUNET_TESTBED_Operation);
1027  op->start = start;
1028  op->state = OP_STATE_INIT;
1029  op->release = release;
1030  op->cb_cls = cls;
1031  return op;
1032 }
1033 
1034 
1043 struct OperationQueue *
1045  unsigned int max_active)
1046 {
1047  struct OperationQueue *queue;
1048  struct FeedbackCtx *fctx;
1049 
1050  queue = GNUNET_new(struct OperationQueue);
1051  queue->type = type;
1052  if (OPERATION_QUEUE_TYPE_FIXED == type)
1053  {
1054  queue->max_active = max_active;
1055  }
1056  else
1057  {
1058  fctx = GNUNET_new(struct FeedbackCtx);
1059  fctx->max_active_bound = max_active;
1061  queue->fctx = fctx;
1063  }
1064  return queue;
1065 }
1066 
1067 
1073 static void
1075 {
1076  struct FeedbackCtx *fctx;
1077 
1078  if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
1079  {
1080  cleanup_tslots(queue);
1081  fctx = queue->fctx;
1083  GNUNET_free(fctx);
1084  }
1085  GNUNET_free(queue);
1086 }
1087 
1088 
1096 void
1098 {
1099  if (GNUNET_YES != is_queue_empty(queue))
1100  {
1101  GNUNET_assert(0 == queue->expired); /* Are you calling twice on same queue? */
1102  queue->expired = 1;
1103  GNUNET_array_append(expired_opqs, n_expired_opqs, queue);
1104  return;
1105  }
1106  queue_destroy(queue);
1107 }
1108 
1109 
1117 int
1119 {
1120  if (GNUNET_NO == is_queue_empty(queue))
1121  return GNUNET_NO;
1123  return GNUNET_YES;
1124 }
1125 
1126 
1133 static void
1135 {
1136  struct QueueEntry *entry;
1137  struct QueueEntry *entry2;
1138 
1139  entry = opq->wq_head;
1140  while (NULL != entry)
1141  {
1142  entry2 = entry->next;
1143  if (GNUNET_NO == check_readiness(entry->op))
1144  break;
1145  entry = entry2;
1146  }
1147 }
1148 
1149 
1158 void
1160  unsigned int max_active)
1161 {
1162  struct QueueEntry *entry;
1163 
1164  queue->max_active = max_active;
1165  queue->overload = 0;
1166  while ((queue->active > queue->max_active)
1167  && (NULL != (entry = queue->rq_head)))
1168  defer(entry->op);
1169  recheck_waiting(queue);
1170 }
1171 
1172 
1184 void
1186  struct GNUNET_TESTBED_Operation *op,
1187  unsigned int nres)
1188 {
1189  unsigned int qsize;
1190 
1191  GNUNET_assert(0 < nres);
1192  qsize = op->nqueues;
1193  GNUNET_array_append(op->queues, op->nqueues, queue);
1194  GNUNET_array_append(op->nres, qsize, nres);
1195  GNUNET_assert(qsize == op->nqueues);
1196 }
1197 
1198 
1210 void
1212  struct GNUNET_TESTBED_Operation *op)
1213 {
1214  return GNUNET_TESTBED_operation_queue_insert2_(queue, op, 1);
1215 }
1216 
1217 
1227 void
1229 {
1230  GNUNET_assert(NULL == op->rq_entry);
1232  (void)check_readiness(op);
1233 }
1234 
1235 
1245 void
1247 {
1248  struct OperationQueue **queues;
1249  size_t ms;
1250  unsigned int nqueues;
1251  unsigned int i;
1252 
1255  nqueues = op->nqueues;
1256  ms = sizeof(struct OperationQueue *) * nqueues;
1257  queues = GNUNET_malloc(ms);
1258  /* Cloning is needed as the operation be released by waiting operations and
1259  hence its nqueues memory ptr will be freed */
1260  GNUNET_memcpy(queues, op->queues, ms);
1261  for (i = 0; i < nqueues; i++)
1262  recheck_waiting(queues[i]);
1263  GNUNET_free(queues);
1264 }
1265 
1266 
1274 void
1276 {
1279 }
1280 
1281 
1288 void
1290 {
1291  struct QueueEntry *entry;
1292  struct OperationQueue *opq;
1293  unsigned int i;
1294 
1295  if (OP_STATE_INIT == op->state)
1296  {
1297  GNUNET_free(op);
1298  return;
1299  }
1300  if (OP_STATE_READY == op->state)
1301  rq_remove(op);
1302  if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
1304  if (OP_STATE_ACTIVE == op->state)
1305  update_tslots(op);
1306  GNUNET_assert(NULL != op->queues);
1307  GNUNET_assert(NULL != op->qentries);
1308  for (i = 0; i < op->nqueues; i++)
1309  {
1310  entry = op->qentries[i];
1311  remove_queue_entry(op, i);
1312  opq = op->queues[i];
1313  switch (op->state)
1314  {
1315  case OP_STATE_INIT:
1316  case OP_STATE_INACTIVE:
1317  GNUNET_assert(0);
1318  break;
1319 
1320  case OP_STATE_WAITING:
1321  break;
1322 
1323  case OP_STATE_ACTIVE:
1324  case OP_STATE_READY:
1325  GNUNET_assert(0 != opq->active);
1326  GNUNET_assert(opq->active >= entry->nres);
1327  opq->active -= entry->nres;
1328  recheck_waiting(opq);
1329  break;
1330  }
1331  GNUNET_free(entry);
1332  }
1334  GNUNET_free(op->queues);
1335  GNUNET_free(op->nres);
1336  if (NULL != op->release)
1337  op->release(op->cb_cls);
1338  GNUNET_free(op);
1339 }
1340 
1341 
1347 void
1349 {
1350  op->failed = GNUNET_YES;
1351 }
1352 
1353 
1358 void __attribute__ ((destructor))
1359 GNUNET_TESTBED_operations_fini()
1360 {
1361  struct OperationQueue *queue;
1362  unsigned int i;
1363  int warn = 0;
1364 
1365  for (i = 0; i < n_expired_opqs; i++)
1366  {
1367  queue = expired_opqs[i];
1368  if (GNUNET_NO == is_queue_empty(queue))
1369  warn = 1;
1370  queue_destroy(queue);
1371  }
1372  GNUNET_free_non_null(expired_opqs);
1373  n_expired_opqs = 0;
1374  if (warn)
1376  "Be disciplined. Some operations were not marked as done.\n");
1377 }
1378 /* end of testbed_api_operations.c */
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
static void process_rq_task(void *cls)
Processes the ready queue by calling the operation start callback of the operation at the head...
static unsigned int parallelism
State of an evaluate operation with another peer.
unsigned int nfailed
Number of operations that have failed.
void GNUNET_TESTBED_operation_mark_failed(struct GNUNET_TESTBED_Operation *op)
Marks an operation as failed.
void * cb_cls
Closure for callbacks.
static int is_queue_empty(struct OperationQueue *opq)
Checks if the given operation queue is empty or not.
struct GNUNET_TIME_Relative tsum
Accumulated time.
static struct OperationQueue ** expired_opqs
Array of operation queues which are to be destroyed.
internal API to access the &#39;operations&#39; subsystem
uint64_t rel_value_us
The actual value.
void GNUNET_TESTBED_operation_queue_insert2_(struct OperationQueue *queue, struct GNUNET_TESTBED_Operation *op, unsigned int nres)
Add an operation to a queue.
unsigned int * nres
Array of number of resources an operation need from each queue.
struct TimeSlot * next
DLL next pointer.
struct QueueEntry * aq_head
DLL head for the active queue.
struct ReadyQueueEntry * prev
prev ptr for DLL
unsigned int nres
How many units of resources does the operation need.
int GNUNET_TESTBED_SD_deviation_factor_(struct SDHandle *h, unsigned int amount, int *factor)
Calculates the factor by which the given amount differs.
Opaque handle for calculating SD.
struct QueueEntry * rq_tail
DLL tail for the ready queue.
static void adaptive_queue_set_max_active(struct OperationQueue *queue, unsigned int n)
Cleansup the existing timing slots and sets new timing slots in the given queue to accommodate given ...
struct GNUNET_TESTBED_Operation * op
The operation associated with this entry.
static int start
Set if we are to start default services (including ARM).
Definition: gnunet-arm.c:39
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
The operation is inactive.
struct TimeSlot * tslots_head
Head pointer for DLL of tslots allocated to this operation.
struct GNUNET_TESTBED_Operation * GNUNET_TESTBED_operation_create_(void *cls, OperationStart start, OperationRelease release)
Create an &#39;operation&#39; to be performed.
void GNUNET_TESTBED_operation_release_(struct GNUNET_TESTBED_Operation *op)
An operation is &#39;done&#39; (was cancelled or finished); remove it from the queues and release associated ...
static void queue_destroy(struct OperationQueue *queue)
Cleanup the given operation queue.
Operation queue which permits a fixed maximum number of operations to be active at any time...
static void cleanup_tslots(struct OperationQueue *queue)
Cleanups the array of timeslots of an operation queue.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
OperationRelease release
Function to call to clean up after the operation (which may or may not have been started yet)...
struct QueueEntry ** qentries
Array of operation queue entries corresponding to this operation in operation queues for this operati...
struct QueueEntry * nq_head
DLL head for the inactive queue.
static void change_state(struct GNUNET_TESTBED_Operation *op, enum OperationState state)
Changes the state of the operation while moving its associated queue entries in the operation&#39;s opera...
#define GNUNET_NO
Definition: gnunet_common.h:78
void(* OperationRelease)(void *cls)
Function to call to cancel an operation (release all associated resources).
The operation is just created and is in initial state.
static struct ReadyQueueEntry * rq_head
DLL head for the ready queue.
struct GNUNET_SCHEDULER_Task * process_rq_task_id
The id of the task to process the ready queue.
#define GNUNET_free_non_null(ptr)
Free the memory pointed to by ptr if ptr is not NULL.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
static void update_tslots(struct GNUNET_TESTBED_Operation *op)
update tslots with the operation&#39;s completion time.
void GNUNET_TESTBED_operation_queue_insert_(struct OperationQueue *queue, struct GNUNET_TESTBED_Operation *op)
Add an operation to a queue.
static int ret
Final status code.
Definition: gnunet-arm.c:89
struct ReadyQueueEntry * next
next ptr for DLL
struct OperationQueue * GNUNET_TESTBED_operation_queue_create_(enum OperationQueueType type, unsigned int max_active)
Create an operation queue.
unsigned int nqueues
Number of queues in the operation queues array.
Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE.
Opaque handle to an abstract operation to be executed by the testing framework.
static int check_readiness(struct GNUNET_TESTBED_Operation *op)
Checks for the readiness of an operation and schedules a operation start task.
static void defer(struct GNUNET_TESTBED_Operation *op)
Defers a ready to be executed operation back to waiting.
struct QueueEntry * next
The next DLL pointer.
enum State state
current state of profiling
unsigned int max_active_bound
Bound on the maximum number of operations which can be active.
struct SDHandle * GNUNET_TESTBED_SD_init_(unsigned int max_cnt)
Initialize standard deviation calculation handle.
void(* OperationStart)(void *cls)
Function to call to start an operation once all queues the operation is part of declare that the oper...
struct QueueEntry * rq_head
DLL head for the ready queue.
The operation is currently waiting for resources.
#define GNUNET_MAX(a, b)
Definition: gnunet_common.h:82
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_now(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run as soon as possible.
Definition: scheduler.c:1264
struct QueueEntry * wq_tail
DLL tail for the wait queue.
#define GNUNET_MIN(a, b)
Definition: gnunet_common.h:80
struct Queue * queue
Queue this entry is queued with.
struct GNUNET_TIME_Absolute tstart
The time at which the operation is started.
static void recheck_waiting(struct OperationQueue *opq)
Rechecks if any of the operations in the given operation queue&#39;s waiting list can be made active...
static void rq_remove(struct GNUNET_TESTBED_Operation *op)
Removes an operation from the ready queue.
int GNUNET_TESTBED_operation_queue_destroy_empty_(struct OperationQueue *queue)
Destroys the operation queue if it is empty.
static int decide_capacity(struct OperationQueue *opq, struct QueueEntry *entry, struct GNUNET_TESTBED_Operation ***ops_, unsigned int *n_ops_)
Checks if the given operation queue has enough resources to provide for the operation of the given qu...
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_get(void)
Get the current time.
Definition: time.c:118
struct TimeSlot * alloc_head
Head for DLL of time slots which are free to be allocated to operations.
void GNUNET_TESTBED_operation_activate_(struct GNUNET_TESTBED_Operation *op)
Marks and inactive operation as active.
unsigned int overload
The number of resources occupied by failed operations in the current shot.
void GNUNET_TESTBED_operation_queue_destroy_(struct OperationQueue *queue)
Destroys an operation queue.
struct SDHandle * sd
Handle for calculating standard deviation.
struct QueueEntry * prev
The prev DLL pointer.
#define GNUNET_SYSERR
Definition: gnunet_common.h:76
OperationQueueType
The type of operation queue.
The operation is ready to be started.
struct ReadyQueueEntry * rq_entry
Entry corresponding to this operation in ready queue.
#define GNUNET_CONTAINER_DLL_insert_tail(head, tail, element)
Insert an element at the tail of a DLL.
struct TimeSlot * tslots_tail
Tail pointer for DLL of tslots allocated to this operation.
enum OperationQueueType type
The type of this opeartion queue.
static void remove_queue_entry(struct GNUNET_TESTBED_Operation *op, unsigned int index)
Removes a queue entry of an operation from one of the operation queues&#39; lists depending on the state ...
#define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE
The number of parallel opeartions we start with by default for adaptive queues.
void GNUNET_TESTBED_operation_queue_reset_max_active_(struct OperationQueue *queue, unsigned int max_active)
Function to reset the maximum number of operations in the given queue.
#define GNUNET_TIME_UNIT_ZERO
Relative time zero.
An entry in the operation queue.
unsigned int active
Number of operations that are currently active in this queue.
functions to calculate standard deviation
#define GNUNET_array_append(arr, size, element)
Append an element to a list (growing the list by one).
enum OperationState state
The state of the operation.
struct GNUNET_TIME_Relative GNUNET_TIME_relative_add(struct GNUNET_TIME_Relative a1, struct GNUNET_TIME_Relative a2)
Add relative times together.
Definition: time.c:577
Queue of operations where we can only support a certain number of concurrent operations of a particul...
struct TimeSlot * alloc_tail
Tail for DLL of time slots which are free to be allocated to operations.
static void rq_add(struct GNUNET_TESTBED_Operation *op)
Adds the operation to the ready queue and starts the &#39;process_rq_task&#39;.
void GNUNET_TESTBED_operation_begin_wait_(struct GNUNET_TESTBED_Operation *op)
Marks the given operation as waiting on the queues.
struct TimeSlot * tslots_freeptr
Pointer to the chunk of time slots.
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_duration(struct GNUNET_TIME_Absolute whence)
Get the duration of an operation as the difference of the current time and the given start time "henc...
Definition: time.c:373
An entry in the ready queue (implemented as DLL)
struct OperationQueue ** queues
Array of operation queues this Operation belongs to.
void __attribute__((destructor))
Cleanup expired operation queues.
int failed
Is this a failed operation?
static struct ReadyQueueEntry * rq_tail
DLL tail for the ready queue.
struct QueueEntry * aq_tail
DLL tail for the active queue.
#define GNUNET_log(kind,...)
Entry in list of pending tasks.
Definition: scheduler.c:131
struct QueueEntry * nq_tail
DLL tail for the inactive queue.
#define ADAPTIVE_QUEUE_DEFAULT_HISTORY
The number of readings containing past operation&#39;s timing information that we keep track of for adapt...
static void adapt_parallelism(struct OperationQueue *queue)
Adapts parallelism in an adaptive queue by using the statistical data from the feedback context...
void GNUNET_TESTBED_SD_add_data_(struct SDHandle *h, unsigned int amount)
Add a reading to SD.
static unsigned int n_expired_opqs
Number of expired operation queues in the above array.
struct TimeSlot * prev
DLL prev pointer.
void GNUNET_TESTBED_SD_destroy_(struct SDHandle *h)
Frees the memory allocated to the SD handle.
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
struct GNUNET_TIME_Relative GNUNET_TIME_relative_divide(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Divide relative time by a given factor.
Definition: time.c:525
A slot to record time taken by an operation.
unsigned int max_active
Max number of operations which can be active at any time in this queue.
Time for absolute times used by GNUnet, in microseconds.
#define GNUNET_YES
Definition: gnunet_common.h:77
static void merge_ops(struct GNUNET_TESTBED_Operation ***old, unsigned int *n_old, struct GNUNET_TESTBED_Operation **new, unsigned int n_new)
Merges an array of operations into another, eliminating duplicates.
The operation has started and is active.
struct GNUNET_TESTBED_Operation * op
The operation to which this timeslot is currently allocated to.
void GNUNET_TESTBED_operation_inactivate_(struct GNUNET_TESTBED_Operation *op)
Marks an active operation as inactive - the operation will be kept in a ready-to-be-released state an...
unsigned int tslots_filled
Number of time slots filled so far.
struct OperationQueue * queue
This operation queue to which this time slot belongs to.
OperationStart start
Function to call when we have the resources to begin the operation.
struct FeedbackCtx * fctx
Feedback context; only relevant for adaptive operation queues.
static void assign_timeslot(struct GNUNET_TESTBED_Operation *op, struct OperationQueue *queue)
Assigns the given operation a time slot from the given operation queue.
Operation queue which adapts the number of operations to be active based on the operation completion ...
struct QueueEntry * wq_head
DLL head for the wait queue.
#define GNUNET_malloc(size)
Wrapper around malloc.
struct GNUNET_TESTBED_Operation * op
The operation this entry holds.
#define GNUNET_free(ptr)
Wrapper around free.
unsigned int nvals
Number of timing values accumulated.
Time for relative time used by GNUnet, in microseconds.
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:956
unsigned int expired
Is this queue marked for expiry?