GNUnet  0.10.x
testbed_api_operations.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2008--2013 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
28 #include "platform.h"
29 #include "testbed_api_operations.h"
30 #include "testbed_api_sd.h"
31 
36 #define ADAPTIVE_QUEUE_DEFAULT_HISTORY 40
37 
42 #define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE 4
43 
47 struct QueueEntry
48 {
52  struct QueueEntry *next;
53 
57  struct QueueEntry *prev;
58 
63 
67  unsigned int nres;
68 };
69 
70 
75 struct OperationQueue;
76 
77 
81 struct TimeSlot
82 {
86  struct TimeSlot *next;
87 
91  struct TimeSlot *prev;
92 
97 
102 
106  struct GNUNET_TIME_Relative tsum;
107 
111  unsigned int nvals;
112 };
113 
114 
119 {
123  struct SDHandle *sd;
124 
129 
134 
140 
144  unsigned int tslots_filled;
145 
149  unsigned int max_active_bound;
150 
154  unsigned int nfailed;
155 };
156 
157 
163 {
169 
174 
180 
185 
191 
196 
203 
208 
213  struct FeedbackCtx *fctx;
214 
219 
223  unsigned int active;
224 
231  unsigned int max_active;
232 
238  unsigned int overload;
239 
243  unsigned int expired;
244 };
245 
246 
251 {
256 
261 
266 
271 
279 };
280 
281 
286 {
291 
296 
301 };
302 
303 
308 {
313 
319 
323  void *cb_cls;
324 
329 
335 
340  unsigned int *nres;
341 
347 
352 
357 
361  struct GNUNET_TIME_Absolute tstart;
362 
366  unsigned int nqueues;
367 
372 
376  int failed;
377 
378 };
379 
383 static struct ReadyQueueEntry *rq_head;
384 
388 static struct ReadyQueueEntry *rq_tail;
389 
393 static struct OperationQueue **expired_opqs;
394 
398 static unsigned int n_expired_opqs;
399 
404 
405 
413 static void
415  struct OperationQueue *queue)
416 {
417  struct FeedbackCtx *fctx = queue->fctx;
418  struct TimeSlot *tslot;
419 
421  tslot = fctx->alloc_head;
422  GNUNET_assert (NULL != tslot);
423  GNUNET_CONTAINER_DLL_remove (fctx->alloc_head, fctx->alloc_tail, tslot);
425  tslot->op = op;
426 }
427 
428 
436 static void
437 remove_queue_entry (struct GNUNET_TESTBED_Operation *op, unsigned int index)
438 {
439  struct OperationQueue *opq;
440  struct QueueEntry *entry;
441 
442  opq = op->queues[index];
443  entry = op->qentries[index];
444  switch (op->state)
445  {
446  case OP_STATE_INIT:
447  GNUNET_assert (0);
448  break;
449  case OP_STATE_WAITING:
450  GNUNET_CONTAINER_DLL_remove (opq->wq_head, opq->wq_tail, entry);
451  break;
452  case OP_STATE_READY:
453  GNUNET_CONTAINER_DLL_remove (opq->rq_head, opq->rq_tail, entry);
454  break;
455  case OP_STATE_ACTIVE:
456  GNUNET_CONTAINER_DLL_remove (opq->aq_head, opq->aq_tail, entry);
457  break;
458  case OP_STATE_INACTIVE:
459  GNUNET_CONTAINER_DLL_remove (opq->nq_head, opq->nq_tail, entry);
460  break;
461  }
462 }
463 
464 
472 static void
474 {
475  struct QueueEntry *entry;
476  struct OperationQueue *opq;
477  unsigned int cnt;
478  unsigned int s;
479 
480  GNUNET_assert (OP_STATE_INIT != state);
481  GNUNET_assert (NULL != op->queues);
482  GNUNET_assert (NULL != op->nres);
483  GNUNET_assert ((OP_STATE_INIT == op->state) || (NULL != op->qentries));
484  GNUNET_assert (op->state != state);
485  for (cnt = 0; cnt < op->nqueues; cnt++)
486  {
487  if (OP_STATE_INIT == op->state)
488  {
489  entry = GNUNET_new (struct QueueEntry);
490  entry->op = op;
491  entry->nres = op->nres[cnt];
492  s = cnt;
493  GNUNET_array_append (op->qentries, s, entry);
494  }
495  else
496  {
497  entry = op->qentries[cnt];
498  remove_queue_entry (op, cnt);
499  }
500  opq = op->queues[cnt];
501  switch (state)
502  {
503  case OP_STATE_INIT:
504  GNUNET_assert (0);
505  break;
506  case OP_STATE_WAITING:
508  break;
509  case OP_STATE_READY:
511  break;
512  case OP_STATE_ACTIVE:
514  break;
515  case OP_STATE_INACTIVE:
517  break;
518  }
519  }
520  op->state = state;
521 }
522 
523 
530 static void
532 {
533  GNUNET_assert (NULL != op->rq_entry);
534  GNUNET_CONTAINER_DLL_remove (rq_head, rq_tail, op->rq_entry);
535  GNUNET_free (op->rq_entry);
536  op->rq_entry = NULL;
537  if ( (NULL == rq_head) && (NULL != process_rq_task_id) )
538  {
539  GNUNET_SCHEDULER_cancel (process_rq_task_id);
540  process_rq_task_id = NULL;
541  }
542 }
543 
544 
553 static void
554 process_rq_task (void *cls)
555 {
557  struct OperationQueue *queue;
558  unsigned int cnt;
559 
560  process_rq_task_id = NULL;
561  GNUNET_assert (NULL != rq_head);
562  GNUNET_assert (NULL != (op = rq_head->op));
563  rq_remove (op);
564  if (NULL != rq_head)
565  process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
567  for (cnt = 0; cnt < op->nqueues; cnt++)
568  {
569  queue = op->queues[cnt];
570  if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
571  assign_timeslot (op, queue);
572  }
574  if (NULL != op->start)
575  op->start (op->cb_cls);
576 }
577 
578 
584 static void
586 {
587  struct ReadyQueueEntry *rq_entry;
588 
589  GNUNET_assert (NULL == op->rq_entry);
590  rq_entry = GNUNET_new (struct ReadyQueueEntry);
591  rq_entry->op = op;
592  GNUNET_CONTAINER_DLL_insert_tail (rq_head, rq_tail, rq_entry);
593  op->rq_entry = rq_entry;
594  if (NULL == process_rq_task_id)
595  process_rq_task_id = GNUNET_SCHEDULER_add_now (&process_rq_task, NULL);
596 }
597 
598 
606 static int
608 {
609  if ( (NULL != opq->wq_head)
610  || (NULL != opq->rq_head)
611  || (NULL != opq->aq_head)
612  || (NULL != opq->nq_head) )
613  return GNUNET_NO;
614  return GNUNET_YES;
615 }
616 
617 
634 static int
636  struct QueueEntry *entry,
637  struct GNUNET_TESTBED_Operation ***ops_,
638  unsigned int *n_ops_)
639 {
640  struct QueueEntry **evict_entries;
641  struct GNUNET_TESTBED_Operation **ops;
643  unsigned int n_ops;
644  unsigned int n_evict_entries;
645  unsigned int need;
646  unsigned int max;
647  int deficit;
648  int rval;
649 
650  GNUNET_assert (NULL != (op = entry->op));
651  GNUNET_assert (0 < (need = entry->nres));
652  ops = NULL;
653  n_ops = 0;
654  evict_entries = NULL;
655  n_evict_entries = 0;
656  rval = GNUNET_YES;
658  {
659  GNUNET_assert (NULL != opq->fctx);
660  GNUNET_assert (opq->max_active >= opq->overload);
661  max = opq->max_active - opq->overload;
662  }
663  else
664  max = opq->max_active;
665  if (opq->active > max)
666  {
667  rval = GNUNET_NO;
668  goto ret;
669  }
670  if ((opq->active + need) <= max)
671  goto ret;
672  deficit = need - (max - opq->active);
673  for (entry = opq->nq_head;
674  (0 < deficit) && (NULL != entry);
675  entry = entry->next)
676  {
677  GNUNET_array_append (evict_entries, n_evict_entries, entry);
678  deficit -= entry->nres;
679  }
680  if (0 < deficit)
681  {
682  rval = GNUNET_NO;
683  goto ret;
684  }
685  for (n_ops = 0; n_ops < n_evict_entries;)
686  {
687  op = evict_entries[n_ops]->op;
688  GNUNET_array_append (ops, n_ops, op); /* increments n-ops */
689  }
690 
691  ret:
692  GNUNET_free_non_null (evict_entries);
693  if (NULL != ops_)
694  *ops_ = ops;
695  else
696  GNUNET_free (ops);
697  if (NULL != n_ops_)
698  *n_ops_ = n_ops;
699  return rval;
700 }
701 
702 
712 static void
714  unsigned int *n_old,
715  struct GNUNET_TESTBED_Operation **new,
716  unsigned int n_new)
717 {
718  struct GNUNET_TESTBED_Operation **cur;
719  unsigned int i;
720  unsigned int j;
721  unsigned int n_cur;
722 
723  GNUNET_assert (NULL != old);
724  n_cur = *n_old;
725  cur = *old;
726  for (i = 0; i < n_new; i++)
727  {
728  for (j = 0; j < *n_old; j++)
729  {
730  if (new[i] == cur[j])
731  break;
732  }
733  if (j < *n_old)
734  continue;
735  GNUNET_array_append (cur, n_cur, new[j]);
736  }
737  *old = cur;
738  *n_old = n_cur;
739 }
740 
741 
742 
748 static int
750 {
751  struct GNUNET_TESTBED_Operation **evict_ops;
752  struct GNUNET_TESTBED_Operation **ops;
753  unsigned int n_ops;
754  unsigned int n_evict_ops;
755  unsigned int i;
756 
757  GNUNET_assert (NULL == op->rq_entry);
759  evict_ops = NULL;
760  n_evict_ops = 0;
761  for (i = 0; i < op->nqueues; i++)
762  {
763  ops = NULL;
764  n_ops = 0;
765  if (GNUNET_NO == decide_capacity (op->queues[i], op->qentries[i],
766  &ops, &n_ops))
767  {
768  GNUNET_free_non_null (evict_ops);
769  return GNUNET_NO;
770  }
771  if (NULL == ops)
772  continue;
773  merge_ops (&evict_ops, &n_evict_ops, ops, n_ops);
774  GNUNET_free (ops);
775  }
776  if (NULL != evict_ops)
777  {
778  for (i = 0; i < n_evict_ops; i++)
779  GNUNET_TESTBED_operation_release_ (evict_ops[i]);
780  GNUNET_free (evict_ops);
781  evict_ops = NULL;
782  /* Evicting the operations should schedule this operation */
784  return GNUNET_YES;
785  }
786  for (i = 0; i < op->nqueues; i++)
787  op->queues[i]->active += op->nres[i];
789  rq_add (op);
790  return GNUNET_YES;
791 }
792 
793 
799 static void
801 {
802  unsigned int i;
803 
805  rq_remove (op);
806  for (i = 0; i < op->nqueues; i++)
807  {
808  GNUNET_assert (op->queues[i]->active >= op->nres[i]);
809  op->queues[i]->active -= op->nres[i];
810  }
812 }
813 
814 
822 static void
824 {
825  struct FeedbackCtx *fctx = queue->fctx;
826  struct TimeSlot *tslot;
828  unsigned int cnt;
829 
830  GNUNET_assert (NULL != fctx);
831  for (cnt = 0; cnt < queue->max_active; cnt++)
832  {
833  tslot = &fctx->tslots_freeptr[cnt];
834  op = tslot->op;
835  if (NULL == op)
836  continue;
838  }
840  fctx->tslots_freeptr = NULL;
841  fctx->alloc_head = NULL;
842  fctx->alloc_tail = NULL;
843  fctx->tslots_filled = 0;
844 }
845 
846 
856 static void
858 {
859  struct FeedbackCtx *fctx = queue->fctx;
860  struct TimeSlot *tslot;
861  unsigned int cnt;
862 
863  cleanup_tslots (queue);
864  n = GNUNET_MIN (n ,fctx->max_active_bound);
865  fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot));
866  fctx->nfailed = 0;
867  for (cnt = 0; cnt < n; cnt++)
868  {
869  tslot = &fctx->tslots_freeptr[cnt];
870  tslot->queue = queue;
872  }
874 }
875 
876 
883 static void
885 {
886  struct GNUNET_TIME_Relative avg;
887  struct FeedbackCtx *fctx;
888  struct TimeSlot *tslot;
889  int sd;
890  unsigned int nvals;
891  unsigned int cnt;
892  unsigned int parallelism;
893 
894  avg = GNUNET_TIME_UNIT_ZERO;
895  nvals = 0;
896  fctx = queue->fctx;
897  for (cnt = 0; cnt < queue->max_active; cnt++)
898  {
899  tslot = &fctx->tslots_freeptr[cnt];
900  avg = GNUNET_TIME_relative_add (avg, tslot->tsum);
901  nvals += tslot->nvals;
902  }
903  GNUNET_assert (nvals >= queue->max_active);
904  GNUNET_assert (fctx->nfailed <= nvals);
905  nvals -= fctx->nfailed;
906  if (0 == nvals)
907  {
908  if (1 == queue->max_active)
910  else
911  adaptive_queue_set_max_active (queue, queue->max_active / 2);
912  return;
913  }
914  avg = GNUNET_TIME_relative_divide (avg, nvals);
915  GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
916  if (GNUNET_SYSERR ==
918  (unsigned int) avg.rel_value_us,
919  &sd))
920  {
921  adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
922  return;
923  }
924 
925  parallelism = 0;
926  if (-1 == sd)
927  parallelism = queue->max_active + 1;
928  if (sd <= -2)
929  parallelism = queue->max_active * 2;
930  if (1 == sd)
931  parallelism = queue->max_active - 1;
932  if (2 <= sd)
933  parallelism = queue->max_active / 2;
934  parallelism = GNUNET_MAX (parallelism, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
935  adaptive_queue_set_max_active (queue, parallelism);
936 
937 #if 0
938  /* old algorithm */
939  if (sd < 0)
940  sd = 0;
941  GNUNET_assert (0 <= sd);
942  //GNUNET_TESTBED_SD_add_data_ (fctx->sd, (unsigned int) avg.rel_value_us);
943  if (0 == sd)
944  {
945  adaptive_queue_set_max_active (queue, queue->max_active * 2);
946  return;
947  }
948  if (1 == sd)
949  {
950  adaptive_queue_set_max_active (queue, queue->max_active + 1);
951  return;
952  }
953  if (1 == queue->max_active)
954  {
956  return;
957  }
958  if (2 == sd)
959  {
960  adaptive_queue_set_max_active (queue, queue->max_active - 1);
961  return;
962  }
963  adaptive_queue_set_max_active (queue, queue->max_active / 2);
964 #endif
965 }
966 
967 
975 static void
977 {
978  struct OperationQueue *queue;
979  struct GNUNET_TIME_Relative t;
980  struct TimeSlot *tslot;
981  struct FeedbackCtx *fctx;
982  unsigned int i;
983 
985  while (NULL != (tslot = op->tslots_head)) /* update time slots */
986  {
987  queue = tslot->queue;
988  fctx = queue->fctx;
990  tslot->op = NULL;
992  tslot);
993  if (op->failed)
994  {
995  fctx->nfailed++;
996  for (i = 0; i < op->nqueues; i++)
997  if (queue == op->queues[i])
998  break;
999  GNUNET_assert (i != op->nqueues);
1000  op->queues[i]->overload += op->nres[i];
1001  }
1002  tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
1003  if (0 != tslot->nvals++)
1004  continue;
1005  fctx->tslots_filled++;
1006  if (queue->max_active == fctx->tslots_filled)
1007  adapt_parallelism (queue);
1008  }
1009 }
1010 
1011 
1020 struct GNUNET_TESTBED_Operation *
1023 {
1024  struct GNUNET_TESTBED_Operation *op;
1025 
1026  op = GNUNET_new (struct GNUNET_TESTBED_Operation);
1027  op->start = start;
1028  op->state = OP_STATE_INIT;
1029  op->release = release;
1030  op->cb_cls = cls;
1031  return op;
1032 }
1033 
1034 
1043 struct OperationQueue *
1045  unsigned int max_active)
1046 {
1047  struct OperationQueue *queue;
1048  struct FeedbackCtx *fctx;
1049 
1050  queue = GNUNET_new (struct OperationQueue);
1051  queue->type = type;
1052  if (OPERATION_QUEUE_TYPE_FIXED == type)
1053  {
1054  queue->max_active = max_active;
1055  }
1056  else
1057  {
1058  fctx = GNUNET_new (struct FeedbackCtx);
1059  fctx->max_active_bound = max_active;
1061  queue->fctx = fctx;
1063  }
1064  return queue;
1065 }
1066 
1067 
1073 static void
1075 {
1076  struct FeedbackCtx *fctx;
1077 
1078  if (OPERATION_QUEUE_TYPE_ADAPTIVE == queue->type)
1079  {
1080  cleanup_tslots (queue);
1081  fctx = queue->fctx;
1083  GNUNET_free (fctx);
1084  }
1085  GNUNET_free (queue);
1086 }
1087 
1088 
1096 void
1098 {
1099  if (GNUNET_YES != is_queue_empty (queue))
1100  {
1101  GNUNET_assert (0 == queue->expired); /* Are you calling twice on same queue? */
1102  queue->expired = 1;
1103  GNUNET_array_append (expired_opqs, n_expired_opqs, queue);
1104  return;
1105  }
1106  queue_destroy (queue);
1107 }
1108 
1109 
1117 int
1119 {
1120  if (GNUNET_NO == is_queue_empty (queue))
1121  return GNUNET_NO;
1123  return GNUNET_YES;
1124 }
1125 
1126 
1133 static void
1135 {
1136  struct QueueEntry *entry;
1137  struct QueueEntry *entry2;
1138 
1139  entry = opq->wq_head;
1140  while (NULL != entry)
1141  {
1142  entry2 = entry->next;
1143  if (GNUNET_NO == check_readiness (entry->op))
1144  break;
1145  entry = entry2;
1146  }
1147 }
1148 
1149 
1158 void
1160  unsigned int max_active)
1161 {
1162  struct QueueEntry *entry;
1163 
1164  queue->max_active = max_active;
1165  queue->overload = 0;
1166  while ( (queue->active > queue->max_active)
1167  && (NULL != (entry = queue->rq_head)) )
1168  defer (entry->op);
1169  recheck_waiting (queue);
1170 }
1171 
1172 
1184 void
1186  struct GNUNET_TESTBED_Operation *op,
1187  unsigned int nres)
1188 {
1189  unsigned int qsize;
1190 
1191  GNUNET_assert (0 < nres);
1192  qsize = op->nqueues;
1193  GNUNET_array_append (op->queues, op->nqueues, queue);
1194  GNUNET_array_append (op->nres, qsize, nres);
1195  GNUNET_assert (qsize == op->nqueues);
1196 }
1197 
1198 
1210 void
1212  struct GNUNET_TESTBED_Operation *op)
1213 {
1214  return GNUNET_TESTBED_operation_queue_insert2_ (queue, op, 1);
1215 }
1216 
1217 
1227 void
1229 {
1230  GNUNET_assert (NULL == op->rq_entry);
1232  (void) check_readiness (op);
1233 }
1234 
1235 
1245 void
1247 {
1248  struct OperationQueue **queues;
1249  size_t ms;
1250  unsigned int nqueues;
1251  unsigned int i;
1252 
1255  nqueues = op->nqueues;
1256  ms = sizeof (struct OperationQueue *) * nqueues;
1257  queues = GNUNET_malloc (ms);
1258  /* Cloning is needed as the operation be released by waiting operations and
1259  hence its nqueues memory ptr will be freed */
1260  GNUNET_memcpy (queues, op->queues, ms);
1261  for (i = 0; i < nqueues; i++)
1262  recheck_waiting (queues[i]);
1263  GNUNET_free (queues);
1264 }
1265 
1266 
1274 void
1276 {
1277 
1280 }
1281 
1282 
1289 void
1291 {
1292  struct QueueEntry *entry;
1293  struct OperationQueue *opq;
1294  unsigned int i;
1295 
1296  if (OP_STATE_INIT == op->state)
1297  {
1298  GNUNET_free (op);
1299  return;
1300  }
1301  if (OP_STATE_READY == op->state)
1302  rq_remove (op);
1303  if (OP_STATE_INACTIVE == op->state) /* Activate the operation if inactive */
1305  if (OP_STATE_ACTIVE == op->state)
1306  update_tslots (op);
1307  GNUNET_assert (NULL != op->queues);
1308  GNUNET_assert (NULL != op->qentries);
1309  for (i = 0; i < op->nqueues; i++)
1310  {
1311  entry = op->qentries[i];
1312  remove_queue_entry (op, i);
1313  opq = op->queues[i];
1314  switch (op->state)
1315  {
1316  case OP_STATE_INIT:
1317  case OP_STATE_INACTIVE:
1318  GNUNET_assert (0);
1319  break;
1320  case OP_STATE_WAITING:
1321  break;
1322  case OP_STATE_ACTIVE:
1323  case OP_STATE_READY:
1324  GNUNET_assert (0 != opq->active);
1325  GNUNET_assert (opq->active >= entry->nres);
1326  opq->active -= entry->nres;
1327  recheck_waiting (opq);
1328  break;
1329  }
1330  GNUNET_free (entry);
1331  }
1333  GNUNET_free (op->queues);
1334  GNUNET_free (op->nres);
1335  if (NULL != op->release)
1336  op->release (op->cb_cls);
1337  GNUNET_free (op);
1338 }
1339 
1340 
1346 void
1348 {
1349  op->failed = GNUNET_YES;
1350 }
1351 
1352 
1357 void __attribute__ ((destructor))
1358 GNUNET_TESTBED_operations_fini ()
1359 {
1360  struct OperationQueue *queue;
1361  unsigned int i;
1362  int warn = 0;
1363 
1364  for (i=0; i < n_expired_opqs; i++)
1365  {
1366  queue = expired_opqs[i];
1367  if (GNUNET_NO == is_queue_empty (queue))
1368  warn = 1;
1369  queue_destroy (queue);
1370  }
1371  GNUNET_free_non_null (expired_opqs);
1372  n_expired_opqs = 0;
1373  if (warn)
1375  "Be disciplined. Some operations were not marked as done.\n");
1376 
1377 }
1378 /* end of testbed_api_operations.c */
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
static void process_rq_task(void *cls)
Processes the ready queue by calling the operation start callback of the operation at the head...
static unsigned int parallelism
State of an evaluate operation with another peer.
unsigned int nfailed
Number of operations that have failed.
void GNUNET_TESTBED_operation_mark_failed(struct GNUNET_TESTBED_Operation *op)
Marks an operation as failed.
void * cb_cls
Closure for callbacks.
static int is_queue_empty(struct OperationQueue *opq)
Checks if the given operation queue is empty or not.
struct GNUNET_TIME_Relative tsum
Accumulated time.
static struct OperationQueue ** expired_opqs
Array of operation queues which are to be destroyed.
internal API to access the &#39;operations&#39; subsystem
uint64_t rel_value_us
The actual value.
void GNUNET_TESTBED_operation_queue_insert2_(struct OperationQueue *queue, struct GNUNET_TESTBED_Operation *op, unsigned int nres)
Add an operation to a queue.
unsigned int * nres
Array of number of resources an operation need from each queue.
struct TimeSlot * next
DLL next pointer.
struct QueueEntry * aq_head
DLL head for the active queue.
struct ReadyQueueEntry * prev
prev ptr for DLL
unsigned int nres
How many units of resources does the operation need.
int GNUNET_TESTBED_SD_deviation_factor_(struct SDHandle *h, unsigned int amount, int *factor)
Calculates the factor by which the given amount differs.
Opaque handle for calculating SD.
struct QueueEntry * rq_tail
DLL tail for the ready queue.
static void adaptive_queue_set_max_active(struct OperationQueue *queue, unsigned int n)
Cleansup the existing timing slots and sets new timing slots in the given queue to accommodate given ...
struct GNUNET_TESTBED_Operation * op
The operation associated with this entry.
static int start
Set if we are to start default services (including ARM).
Definition: gnunet-arm.c:39
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
The operation is inactive.
struct TimeSlot * tslots_head
Head pointer for DLL of tslots allocated to this operation.
struct GNUNET_TESTBED_Operation * GNUNET_TESTBED_operation_create_(void *cls, OperationStart start, OperationRelease release)
Create an &#39;operation&#39; to be performed.
void GNUNET_TESTBED_operation_release_(struct GNUNET_TESTBED_Operation *op)
An operation is &#39;done&#39; (was cancelled or finished); remove it from the queues and release associated ...
static void queue_destroy(struct OperationQueue *queue)
Cleanup the given operation queue.
Operation queue which permits a fixed maximum number of operations to be active at any time...
static void cleanup_tslots(struct OperationQueue *queue)
Cleanups the array of timeslots of an operation queue.
OperationRelease release
Function to call to clean up after the operation (which may or may not have been started yet)...
struct QueueEntry ** qentries
Array of operation queue entries corresponding to this operation in operation queues for this operati...
struct QueueEntry * nq_head
DLL head for the inactive queue.
static void change_state(struct GNUNET_TESTBED_Operation *op, enum OperationState state)
Changes the state of the operation while moving its associated queue entries in the operation&#39;s opera...
#define GNUNET_NO
Definition: gnunet_common.h:81
void(* OperationRelease)(void *cls)
Function to call to cancel an operation (release all associated resources).
The operation is just created and is in initial state.
static struct ReadyQueueEntry * rq_head
DLL head for the ready queue.
struct GNUNET_SCHEDULER_Task * process_rq_task_id
The id of the task to process the ready queue.
#define GNUNET_free_non_null(ptr)
Free the memory pointed to by ptr if ptr is not NULL.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
static void update_tslots(struct GNUNET_TESTBED_Operation *op)
update tslots with the operation&#39;s completion time.
void GNUNET_TESTBED_operation_queue_insert_(struct OperationQueue *queue, struct GNUNET_TESTBED_Operation *op)
Add an operation to a queue.
static int ret
Final status code.
Definition: gnunet-arm.c:89
struct ReadyQueueEntry * next
next ptr for DLL
struct OperationQueue * GNUNET_TESTBED_operation_queue_create_(enum OperationQueueType type, unsigned int max_active)
Create an operation queue.
unsigned int nqueues
Number of queues in the operation queues array.
Context for operation queues of type OPERATION_QUEUE_TYPE_ADAPTIVE.
Opaque handle to an abstract operation to be executed by the testing framework.
static int check_readiness(struct GNUNET_TESTBED_Operation *op)
Checks for the readiness of an operation and schedules a operation start task.
static void defer(struct GNUNET_TESTBED_Operation *op)
Defers a ready to be executed operation back to waiting.
struct QueueEntry * next
The next DLL pointer.
enum State state
current state of profiling
unsigned int max_active_bound
Bound on the maximum number of operations which can be active.
#define GNUNET_memcpy(dst, src, n)
struct SDHandle * GNUNET_TESTBED_SD_init_(unsigned int max_cnt)
Initialize standard deviation calculation handle.
void(* OperationStart)(void *cls)
Function to call to start an operation once all queues the operation is part of declare that the oper...
struct QueueEntry * rq_head
DLL head for the ready queue.
The operation is currently waiting for resources.
#define GNUNET_MAX(a, b)
Definition: gnunet_common.h:85
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_now(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run as soon as possible.
Definition: scheduler.c:1273
struct QueueEntry * wq_tail
DLL tail for the wait queue.
#define GNUNET_MIN(a, b)
Definition: gnunet_common.h:83
struct Queue * queue
Queue this entry is queued with.
struct GNUNET_TIME_Absolute tstart
The time at which the operation is started.
static void recheck_waiting(struct OperationQueue *opq)
Rechecks if any of the operations in the given operation queue&#39;s waiting list can be made active...
static void rq_remove(struct GNUNET_TESTBED_Operation *op)
Removes an operation from the ready queue.
int GNUNET_TESTBED_operation_queue_destroy_empty_(struct OperationQueue *queue)
Destroys the operation queue if it is empty.
static int decide_capacity(struct OperationQueue *opq, struct QueueEntry *entry, struct GNUNET_TESTBED_Operation ***ops_, unsigned int *n_ops_)
Checks if the given operation queue has enough resources to provide for the operation of the given qu...
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_get(void)
Get the current time.
Definition: time.c:118
struct TimeSlot * alloc_head
Head for DLL of time slots which are free to be allocated to operations.
void GNUNET_TESTBED_operation_activate_(struct GNUNET_TESTBED_Operation *op)
Marks and inactive operation as active.
unsigned int overload
The number of resources occupied by failed operations in the current shot.
void GNUNET_TESTBED_operation_queue_destroy_(struct OperationQueue *queue)
Destroys an operation queue.
struct SDHandle * sd
Handle for calculating standard deviation.
struct QueueEntry * prev
The prev DLL pointer.
#define GNUNET_SYSERR
Definition: gnunet_common.h:79
OperationQueueType
The type of operation queue.
The operation is ready to be started.
struct ReadyQueueEntry * rq_entry
Entry corresponding to this operation in ready queue.
#define GNUNET_CONTAINER_DLL_insert_tail(head, tail, element)
Insert an element at the tail of a DLL.
struct TimeSlot * tslots_tail
Tail pointer for DLL of tslots allocated to this operation.
enum OperationQueueType type
The type of this opeartion queue.
static void remove_queue_entry(struct GNUNET_TESTBED_Operation *op, unsigned int index)
Removes a queue entry of an operation from one of the operation queues&#39; lists depending on the state ...
#define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE
The number of parallel opeartions we start with by default for adaptive queues.
void GNUNET_TESTBED_operation_queue_reset_max_active_(struct OperationQueue *queue, unsigned int max_active)
Function to reset the maximum number of operations in the given queue.
#define GNUNET_TIME_UNIT_ZERO
Relative time zero.
An entry in the operation queue.
unsigned int active
Number of operations that are currently active in this queue.
functions to calculate standard deviation
#define GNUNET_array_append(arr, size, element)
Append an element to a list (growing the list by one).
enum OperationState state
The state of the operation.
struct GNUNET_TIME_Relative GNUNET_TIME_relative_add(struct GNUNET_TIME_Relative a1, struct GNUNET_TIME_Relative a2)
Add relative times together.
Definition: time.c:576
Queue of operations where we can only support a certain number of concurrent operations of a particul...
struct TimeSlot * alloc_tail
Tail for DLL of time slots which are free to be allocated to operations.
static void rq_add(struct GNUNET_TESTBED_Operation *op)
Adds the operation to the ready queue and starts the &#39;process_rq_task&#39;.
void GNUNET_TESTBED_operation_begin_wait_(struct GNUNET_TESTBED_Operation *op)
Marks the given operation as waiting on the queues.
struct TimeSlot * tslots_freeptr
Pointer to the chunk of time slots.
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_duration(struct GNUNET_TIME_Absolute whence)
Get the duration of an operation as the difference of the current time and the given start time "henc...
Definition: time.c:373
An entry in the ready queue (implemented as DLL)
struct OperationQueue ** queues
Array of operation queues this Operation belongs to.
void __attribute__((destructor))
Cleanup expired operation queues.
int failed
Is this a failed operation?
static struct ReadyQueueEntry * rq_tail
DLL tail for the ready queue.
struct QueueEntry * aq_tail
DLL tail for the active queue.
#define GNUNET_log(kind,...)
Entry in list of pending tasks.
Definition: scheduler.c:134
struct QueueEntry * nq_tail
DLL tail for the inactive queue.
#define ADAPTIVE_QUEUE_DEFAULT_HISTORY
The number of readings containing past operation&#39;s timing information that we keep track of for adapt...
static void adapt_parallelism(struct OperationQueue *queue)
Adapts parallelism in an adaptive queue by using the statistical data from the feedback context...
void GNUNET_TESTBED_SD_add_data_(struct SDHandle *h, unsigned int amount)
Add a reading to SD.
static unsigned int n_expired_opqs
Number of expired operation queues in the above array.
struct TimeSlot * prev
DLL prev pointer.
void GNUNET_TESTBED_SD_destroy_(struct SDHandle *h)
Frees the memory allocated to the SD handle.
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
struct GNUNET_TIME_Relative GNUNET_TIME_relative_divide(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Divide relative time by a given factor.
Definition: time.c:525
A slot to record time taken by an operation.
unsigned int max_active
Max number of operations which can be active at any time in this queue.
Time for absolute times used by GNUnet, in microseconds.
#define GNUNET_YES
Definition: gnunet_common.h:80
static void merge_ops(struct GNUNET_TESTBED_Operation ***old, unsigned int *n_old, struct GNUNET_TESTBED_Operation **new, unsigned int n_new)
Merges an array of operations into another, eliminating duplicates.
The operation has started and is active.
struct GNUNET_TESTBED_Operation * op
The operation to which this timeslot is currently allocated to.
void GNUNET_TESTBED_operation_inactivate_(struct GNUNET_TESTBED_Operation *op)
Marks an active operation as inactive - the operation will be kept in a ready-to-be-released state an...
unsigned int tslots_filled
Number of time slots filled so far.
struct OperationQueue * queue
This operation queue to which this time slot belongs to.
OperationStart start
Function to call when we have the resources to begin the operation.
struct FeedbackCtx * fctx
Feedback context; only relevant for adaptive operation queues.
static void assign_timeslot(struct GNUNET_TESTBED_Operation *op, struct OperationQueue *queue)
Assigns the given operation a time slot from the given operation queue.
Operation queue which adapts the number of operations to be active based on the operation completion ...
struct QueueEntry * wq_head
DLL head for the wait queue.
#define GNUNET_malloc(size)
Wrapper around malloc.
struct GNUNET_TESTBED_Operation * op
The operation this entry holds.
#define GNUNET_free(ptr)
Wrapper around free.
unsigned int nvals
Number of timing values accumulated.
Time for relative time used by GNUnet, in microseconds.
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:965
unsigned int expired
Is this queue marked for expiry?