GNUnet  0.11.x
mq.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2012-2019 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 
29 #define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__)
30 
31 
33 {
39 
45 
52 
57 
62 
66  void *sent_cls;
67 
74 
79 };
80 
81 
86 {
91 
97 
102 
107 
111  void *impl_state;
112 
117 
122 
127 
132 
137 
144 
149 
154 
159 
165 
171  uint32_t assoc_id;
172 
176  unsigned int queue_length;
177 
183 
188 };
189 
190 
201 void
203  const struct GNUNET_MessageHeader *mh)
204 {
205  int ret;
206 
207  ret = GNUNET_MQ_handle_message (mq->handlers, mh);
208  if (GNUNET_SYSERR == ret)
209  {
211  return;
212  }
213 }
214 
215 
228 int
230  const struct GNUNET_MessageHeader *mh)
231 {
232  const struct GNUNET_MQ_MessageHandler *handler;
233  int handled = GNUNET_NO;
234  uint16_t msize = ntohs (mh->size);
235  uint16_t mtype = ntohs (mh->type);
236 
238  "Received message of type %u and size %u\n",
239  mtype,
240  msize);
241 
242  if (NULL == handlers)
243  goto done;
244  for (handler = handlers; NULL != handler->cb; handler++)
245  {
246  if (handler->type == mtype)
247  {
248  handled = GNUNET_YES;
249  if ((handler->expected_size > msize) ||
250  ((handler->expected_size != msize) && (NULL == handler->mv)))
251  {
252  /* Too small, or not an exact size and
253  no 'mv' handler to check rest */
255  "Received malformed message of type %u\n",
256  (unsigned int) handler->type);
257  return GNUNET_SYSERR;
258  }
259  if ((NULL == handler->mv) ||
260  (GNUNET_OK == handler->mv (handler->cls, mh)))
261  {
262  /* message well-formed, pass to handler */
263  handler->cb (handler->cls, mh);
264  }
265  else
266  {
267  /* Message rejected by check routine */
269  "Received malformed message of type %u\n",
270  (unsigned int) handler->type);
271  return GNUNET_SYSERR;
272  }
273  break;
274  }
275  }
276  done:
277  if (GNUNET_NO == handled)
278  {
280  "No handler for message of type %u and size %u\n",
281  mtype,
282  msize);
283  return GNUNET_NO;
284  }
285  return GNUNET_OK;
286 }
287 
288 
299 void
301  enum GNUNET_MQ_Error error)
302 {
303  if (NULL == mq->error_handler)
304  {
306  "Got error %d, but no handler installed\n",
307  (int) error);
308  return;
309  }
311  error);
312 }
313 
314 
322 void
324 {
325  GNUNET_assert (NULL == ev->parent_queue);
326  GNUNET_free (ev);
327 }
328 
329 
336 unsigned int
338 {
339  if (GNUNET_YES != mq->in_flight)
340  {
341  return mq->queue_length;
342  }
343  return mq->queue_length - 1;
344 }
345 
346 
354 void
356  struct GNUNET_MQ_Envelope *ev)
357 {
358  if (NULL == mq)
360  "mq is NUll when sending message of type %u\n",
361  (unsigned int) ntohs (ev->mh->type));
362  GNUNET_assert (NULL != mq);
363  GNUNET_assert (NULL == ev->parent_queue);
364 
365  mq->queue_length++;
366  if (mq->queue_length >= 10000)
367  {
368  /* This would seem like a bug... */
370  "MQ with %u entries extended by message of type %u (FC broken?)\n",
371  (unsigned int) mq->queue_length,
372  (unsigned int) ntohs (ev->mh->type));
373  }
374  ev->parent_queue = mq;
375  /* is the implementation busy? queue it! */
376  if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
377  {
379  mq->envelope_tail,
380  ev);
381  return;
382  }
383  GNUNET_assert (NULL == mq->envelope_head);
384  mq->current_envelope = ev;
385 
387  "sending message of type %u, queue empty (MQ: %p)\n",
388  ntohs (ev->mh->type),
389  mq);
390 
391  mq->send_impl (mq,
392  ev->mh,
393  mq->impl_state);
394 }
395 
396 
404 struct GNUNET_MQ_Envelope *
406 {
407  struct GNUNET_MQ_Envelope *env;
408 
409  env = mq->envelope_head;
411  mq->queue_length--;
412  env->parent_queue = NULL;
413  return env;
414 }
415 
416 
424 struct GNUNET_MQ_Envelope *
426 {
427  GNUNET_assert (NULL == env->next);
428  GNUNET_assert (NULL == env->parent_queue);
429  GNUNET_assert (NULL == env->sent_cb);
431  return GNUNET_MQ_msg_copy (env->mh);
432 }
433 
434 
442 void
444  const struct GNUNET_MQ_Envelope *ev)
445 {
446  struct GNUNET_MQ_Envelope *env;
447  uint16_t msize;
448 
449  msize = ntohs (ev->mh->size);
450  env = GNUNET_malloc (sizeof(struct GNUNET_MQ_Envelope) + msize);
451  env->mh = (struct GNUNET_MessageHeader *) &env[1];
452  env->sent_cb = ev->sent_cb;
453  env->sent_cls = ev->sent_cls;
454  GNUNET_memcpy (&env[1], ev->mh, msize);
455  GNUNET_MQ_send (mq, env);
456 }
457 
458 
466 static void
468 {
469  struct GNUNET_MQ_Handle *mq = cls;
470 
471  mq->send_task = NULL;
472  /* call is only valid if we're actually currently sending
473  * a message */
474  if (NULL == mq->envelope_head)
475  return;
478  mq->envelope_tail,
479  mq->current_envelope);
480 
482  "sending message of type %u from queue\n",
483  ntohs (mq->current_envelope->mh->type));
484 
485  mq->send_impl (mq,
486  mq->current_envelope->mh,
487  mq->impl_state);
488 }
489 
490 
498 void
500 {
501  struct GNUNET_MQ_Envelope *current_envelope;
503 
504  GNUNET_assert (0 < mq->queue_length);
505  mq->queue_length--;
506  mq->in_flight = GNUNET_NO;
507  current_envelope = mq->current_envelope;
508  current_envelope->parent_queue = NULL;
509  mq->current_envelope = NULL;
510  GNUNET_assert (NULL == mq->send_task);
512  if (NULL != (cb = current_envelope->sent_cb))
513  {
514  current_envelope->sent_cb = NULL;
515  cb (current_envelope->sent_cls);
516  }
517  GNUNET_free (current_envelope);
518 }
519 
520 
531 void
533 {
534  struct GNUNET_MQ_Envelope *current_envelope;
536 
537  mq->in_flight = GNUNET_YES;
538  /* call is only valid if we're actually currently sending
539  * a message */
540  current_envelope = mq->current_envelope;
541  GNUNET_assert (NULL != current_envelope);
542  /* can't call cancel from now on anymore */
543  current_envelope->parent_queue = NULL;
544  if (NULL != (cb = current_envelope->sent_cb))
545  {
546  current_envelope->sent_cb = NULL;
547  cb (current_envelope->sent_cls);
548  }
549 }
550 
551 
564 struct GNUNET_MQ_Handle *
567  GNUNET_MQ_CancelImpl cancel,
568  void *impl_state,
569  const struct GNUNET_MQ_MessageHandler *handlers,
571  void *error_handler_cls)
572 {
573  struct GNUNET_MQ_Handle *mq;
574 
575  mq = GNUNET_new (struct GNUNET_MQ_Handle);
576  mq->send_impl = send;
577  mq->destroy_impl = destroy;
578  mq->cancel_impl = cancel;
579  mq->handlers = GNUNET_MQ_copy_handlers (handlers);
582  mq->impl_state = impl_state;
583 
584  return mq;
585 }
586 
587 
595 void
597 {
598  if (NULL == mq->handlers)
599  return;
600  for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
601  mq->handlers[i].cls = handlers_cls;
602 }
603 
604 
614 const struct GNUNET_MessageHeader *
616 {
617  GNUNET_assert (NULL != mq->current_envelope);
618  GNUNET_assert (NULL != mq->current_envelope->mh);
619  return mq->current_envelope->mh;
620 }
621 
622 
637 void *
639 {
640  return mq->impl_state;
641 }
642 
643 
644 struct GNUNET_MQ_Envelope *
645 GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
646 {
647  struct GNUNET_MQ_Envelope *ev;
648 
649  ev = GNUNET_malloc (size + sizeof(struct GNUNET_MQ_Envelope));
650  ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
651  ev->mh->size = htons (size);
652  ev->mh->type = htons (type);
653  if (NULL != mhp)
654  *mhp = ev->mh;
655  return ev;
656 }
657 
658 
665 struct GNUNET_MQ_Envelope *
667 {
668  struct GNUNET_MQ_Envelope *mqm;
669  uint16_t size = ntohs (hdr->size);
670 
671  mqm = GNUNET_malloc (sizeof(*mqm) + size);
672  mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
673  GNUNET_memcpy (mqm->mh, hdr, size);
674  return mqm;
675 }
676 
677 
687 struct GNUNET_MQ_Envelope *
689  uint16_t base_size,
690  uint16_t type,
691  const struct GNUNET_MessageHeader *nested_mh)
692 {
693  struct GNUNET_MQ_Envelope *mqm;
694  uint16_t size;
695 
696  if (NULL == nested_mh)
697  return GNUNET_MQ_msg_ (mhp, base_size, type);
698 
699  size = base_size + ntohs (nested_mh->size);
700 
701  /* check for uint16_t overflow */
702  if (size < base_size)
703  return NULL;
704 
705  mqm = GNUNET_MQ_msg_ (mhp, size, type);
706  GNUNET_memcpy ((char *) mqm->mh + base_size,
707  nested_mh,
708  ntohs (nested_mh->size));
709 
710  return mqm;
711 }
712 
713 
720 uint32_t
721 GNUNET_MQ_assoc_add (struct GNUNET_MQ_Handle *mq, void *assoc_data)
722 {
723  uint32_t id;
724 
725  if (NULL == mq->assoc_map)
726  {
728  mq->assoc_id = 1;
729  }
730  id = mq->assoc_id++;
733  mq->assoc_map,
734  id,
735  assoc_data,
737  return id;
738 }
739 
740 
748 void *
749 GNUNET_MQ_assoc_get (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
750 {
751  if (NULL == mq->assoc_map)
752  return NULL;
753  return GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
754 }
755 
756 
764 void *
765 GNUNET_MQ_assoc_remove (struct GNUNET_MQ_Handle *mq, uint32_t request_id)
766 {
767  void *val;
768 
769  if (NULL == mq->assoc_map)
770  return NULL;
771  val = GNUNET_CONTAINER_multihashmap32_get (mq->assoc_map, request_id);
773  return val;
774 }
775 
776 
786 void
789  void *cb_cls)
790 {
791  /* allow setting *OR* clearing callback */
792  GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb));
793  ev->sent_cb = cb;
794  ev->sent_cls = cb_cls;
795 }
796 
797 
803 {
808 
813 
818 
823 
827  void *cb_cls;
828 };
829 
830 
836 void
838 {
840 
841  if (NULL != mq->destroy_impl)
842  {
843  mq->destroy_impl (mq, mq->impl_state);
844  }
845  if (NULL != mq->send_task)
846  {
848  mq->send_task = NULL;
849  }
850  while (NULL != mq->envelope_head)
851  {
852  struct GNUNET_MQ_Envelope *ev;
853 
854  ev = mq->envelope_head;
855  ev->parent_queue = NULL;
857  GNUNET_assert (0 < mq->queue_length);
858  mq->queue_length--;
860  "MQ destroy drops message of type %u\n",
861  ntohs (ev->mh->type));
862  GNUNET_MQ_discard (ev);
863  }
864  if (NULL != mq->current_envelope)
865  {
866  /* we can only discard envelopes that
867  * are not queued! */
868  mq->current_envelope->parent_queue = NULL;
870  "MQ destroy drops current message of type %u\n",
871  ntohs (mq->current_envelope->mh->type));
873  mq->current_envelope = NULL;
874  GNUNET_assert (0 < mq->queue_length);
875  mq->queue_length--;
876  }
877  GNUNET_assert (0 == mq->queue_length);
878  while (NULL != (dnh = mq->dnh_head))
879  {
880  dnh->cb (dnh->cb_cls);
882  }
883  if (NULL != mq->assoc_map)
884  {
886  mq->assoc_map = NULL;
887  }
888  GNUNET_free (mq->handlers);
889  GNUNET_free (mq);
890 }
891 
892 
893 const struct GNUNET_MessageHeader *
895  uint16_t base_size)
896 {
897  uint16_t whole_size;
898  uint16_t nested_size;
899  const struct GNUNET_MessageHeader *nested_msg;
900 
901  whole_size = ntohs (mh->size);
902  GNUNET_assert (whole_size >= base_size);
903  nested_size = whole_size - base_size;
904  if (0 == nested_size)
905  return NULL;
906  if (nested_size < sizeof(struct GNUNET_MessageHeader))
907  {
908  GNUNET_break_op (0);
909  return NULL;
910  }
911  nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
912  if (ntohs (nested_msg->size) != nested_size)
913  {
914  GNUNET_break_op (0);
915  return NULL;
916  }
917  return nested_msg;
918 }
919 
920 
928 void
930 {
931  struct GNUNET_MQ_Handle *mq = ev->parent_queue;
932 
933  GNUNET_assert (NULL != mq);
934  GNUNET_assert (NULL != mq->cancel_impl);
935 
937 
938  if (mq->current_envelope == ev)
939  {
940  /* complex case, we already started with transmitting
941  the message using the callbacks. */
943  GNUNET_assert (0 < mq->queue_length);
944  mq->queue_length--;
945  mq->cancel_impl (mq,
946  mq->impl_state);
947  /* continue sending the next message, if any */
949  if (NULL != mq->current_envelope)
950  {
952  mq->envelope_tail,
953  mq->current_envelope);
954 
956  "sending canceled message of type %u queue\n",
957  ntohs (ev->mh->type));
958  mq->send_impl (mq,
959  mq->current_envelope->mh,
960  mq->impl_state);
961  }
962  }
963  else
964  {
965  /* simple case, message is still waiting in the queue */
967  mq->envelope_tail,
968  ev);
969  GNUNET_assert (0 < mq->queue_length);
970  mq->queue_length--;
971  }
972 
973  if (GNUNET_YES != mq->evacuate_called)
974  {
975  ev->parent_queue = NULL;
976  ev->mh = NULL;
977  /* also frees ev */
978  GNUNET_free (ev);
979  }
980 }
981 
982 
990 struct GNUNET_MQ_Envelope *
992 {
993  return mq->current_envelope;
994 }
995 
996 
1003 struct GNUNET_MQ_Envelope *
1005 {
1006  if (NULL != mq->envelope_tail)
1007  return mq->envelope_tail;
1008 
1009  return mq->current_envelope;
1010 }
1011 
1012 
1021 void
1024 {
1025  env->priority = pp;
1027 }
1028 
1029 
1038 {
1039  struct GNUNET_MQ_Handle *mq = env->parent_queue;
1040 
1041  if (GNUNET_YES == env->have_custom_options)
1042  return env->priority;
1043  if (NULL == mq)
1044  return 0;
1045  return mq->priority;
1046 }
1047 
1048 
1060 {
1062 
1063  ret = GNUNET_MAX (p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK);
1064  ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE));
1065  ret |=
1067  ret |=
1069  ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT));
1070  ret |=
1072  return ret;
1073 }
1074 
1075 
1082 void
1085 {
1086  mq->priority = pp;
1087 }
1088 
1089 
1096 const struct GNUNET_MessageHeader *
1098 {
1099  return env->mh;
1100 }
1101 
1102 
1109 const struct GNUNET_MQ_Envelope *
1111 {
1112  return env->next;
1113 }
1114 
1115 
1128  void *cb_cls)
1129 {
1131 
1133  dnh->mq = mq;
1134  dnh->cb = cb;
1135  dnh->cb_cls = cb_cls;
1137  return dnh;
1138 }
1139 
1140 
1146 void
1149 {
1150  struct GNUNET_MQ_Handle *mq = dnh->mq;
1151 
1153  GNUNET_free (dnh);
1154 }
1155 
1156 
1169 void
1171  struct GNUNET_MQ_Envelope **env_tail,
1172  struct GNUNET_MQ_Envelope *env)
1173 {
1174  GNUNET_CONTAINER_DLL_insert (*env_head, *env_tail, env);
1175 }
1176 
1177 
1190 void
1192  struct GNUNET_MQ_Envelope **env_tail,
1193  struct GNUNET_MQ_Envelope *env)
1194 {
1195  GNUNET_CONTAINER_DLL_insert_tail (*env_head, *env_tail, env);
1196 }
1197 
1198 
1211 void
1213  struct GNUNET_MQ_Envelope **env_tail,
1214  struct GNUNET_MQ_Envelope *env)
1215 {
1216  GNUNET_CONTAINER_DLL_remove (*env_head, *env_tail, env);
1217 }
1218 
1219 
1230 struct GNUNET_MQ_MessageHandler *
1232 {
1233  struct GNUNET_MQ_MessageHandler *copy;
1234  unsigned int count;
1235 
1236  if (NULL == handlers)
1237  return NULL;
1238 
1239  count = GNUNET_MQ_count_handlers (handlers);
1240  copy = GNUNET_new_array (count + 1, struct GNUNET_MQ_MessageHandler);
1241  GNUNET_memcpy (copy,
1242  handlers,
1243  count * sizeof(struct GNUNET_MQ_MessageHandler));
1244  return copy;
1245 }
1246 
1247 
1260 struct GNUNET_MQ_MessageHandler *
1262  GNUNET_MQ_MessageCallback agpl_handler,
1263  void *agpl_cls)
1264 {
1265  struct GNUNET_MQ_MessageHandler *copy;
1266  unsigned int count;
1267 
1268  if (NULL == handlers)
1269  return NULL;
1270  count = GNUNET_MQ_count_handlers (handlers);
1271  copy = GNUNET_new_array (count + 2, struct GNUNET_MQ_MessageHandler);
1272  GNUNET_memcpy (copy,
1273  handlers,
1274  count * sizeof(struct GNUNET_MQ_MessageHandler));
1275  copy[count].mv = NULL;
1276  copy[count].cb = agpl_handler;
1277  copy[count].cls = agpl_cls;
1278  copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL;
1279  copy[count].expected_size = sizeof(struct GNUNET_MessageHeader);
1280  return copy;
1281 }
1282 
1283 
1290 unsigned int
1292 {
1293  unsigned int i;
1294 
1295  if (NULL == handlers)
1296  return 0;
1297 
1298  for (i = 0; NULL != handlers[i].cb; i++)
1299  ;
1300 
1301  return i;
1302 }
1303 
1304 
1311 const char *
1313 {
1314  switch (type)
1315  {
1317  return "NONE";
1318 
1320  return "BANDWIDTH";
1321 
1323  return "LATENCY";
1324 
1326  return "RELIABILITY";
1327  }
1328  ;
1329  return NULL;
1330 }
1331 
1332 
1333 /* end of mq.c */
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
void GNUNET_MQ_dll_insert_tail(struct GNUNET_MQ_Envelope **env_head, struct GNUNET_MQ_Envelope **env_tail, struct GNUNET_MQ_Envelope *env)
Insert env into the envelope DLL starting at env_head Note that env must not be in any MQ while this ...
Definition: mq.c:1191
struct GNUNET_MQ_MessageHandler * handlers
Handlers array, or NULL if the queue should not receive messages.
Definition: mq.c:90
enum GNUNET_MQ_PriorityPreferences GNUNET_MQ_env_combine_options(enum GNUNET_MQ_PriorityPreferences p1, enum GNUNET_MQ_PriorityPreferences p2)
Combine performance preferences set for different envelopes that are being combined into one larger e...
Definition: mq.c:1058
struct GNUNET_MQ_Envelope * GNUNET_MQ_get_last_envelope(struct GNUNET_MQ_Handle *mq)
Function to obtain the last envelope in the queue.
Definition: mq.c:1004
void GNUNET_MQ_dll_insert_head(struct GNUNET_MQ_Envelope **env_head, struct GNUNET_MQ_Envelope **env_tail, struct GNUNET_MQ_Envelope *env)
Insert env into the envelope DLL starting at env_head Note that env must not be in any MQ while this ...
Definition: mq.c:1170
Handle we return for callbacks registered to be notified when GNUNET_MQ_destroy() is called on a queu...
Definition: mq.c:802
struct GNUNET_MessageHeader * mh
Actual allocated message header.
Definition: mq.c:51
static void done()
struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_copy(const struct GNUNET_MessageHeader *hdr)
Create a new envelope by copying an existing message.
Definition: mq.c:666
void * impl_state
Implementation-specific state.
Definition: mq.c:111
unsigned int queue_length
Number of entries we have in the envelope-DLL.
Definition: mq.c:176
struct GNUNET_MQ_DestroyNotificationHandle * GNUNET_MQ_destroy_notify(struct GNUNET_MQ_Handle *mq, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Register function to be called whenever mq is being destroyed.
Definition: mq.c:1126
const struct GNUNET_MessageHeader * GNUNET_MQ_env_get_msg(const struct GNUNET_MQ_Envelope *env)
Obtain message contained in envelope.
Definition: mq.c:1097
void GNUNET_MQ_inject_error(struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_Error error)
Call the error handler of a message queue with the given error code.
Definition: mq.c:300
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
struct GNUNET_MQ_DestroyNotificationHandle * dnh_tail
Functions to call on queue destruction; kept in a DLL.
Definition: mq.c:158
uint16_t expected_size
Expected size of messages of this type.
uint32_t GNUNET_MQ_assoc_add(struct GNUNET_MQ_Handle *mq, void *assoc_data)
Associate the assoc_data in mq with a unique request id.
Definition: mq.c:721
void(* GNUNET_MQ_ErrorHandler)(void *cls, enum GNUNET_MQ_Error error)
Generic error handler, called with the appropriate error code and the same closure specified at the c...
GNUNET_MQ_Error
Error codes for the queue.
int have_custom_options
Did the application call GNUNET_MQ_env_set_options()?
Definition: mq.c:78
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
struct GNUNET_MQ_MessageHandler * GNUNET_MQ_copy_handlers(const struct GNUNET_MQ_MessageHandler *handlers)
Copy an array of handlers.
Definition: mq.c:1231
int evacuate_called
GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
Definition: mq.c:182
#define GNUNET_MESSAGE_TYPE_REQUEST_AGPL
Message to request source code link.
void GNUNET_MQ_env_set_options(struct GNUNET_MQ_Envelope *env, enum GNUNET_MQ_PriorityPreferences pp)
Set application-specific preferences for this envelope.
Definition: mq.c:1022
No preference was expressed.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
static int ret
Return value of the commandline.
Definition: gnunet-abd.c:81
int GNUNET_MQ_handle_message(const struct GNUNET_MQ_MessageHandler *handlers, const struct GNUNET_MessageHeader *mh)
Call the message message handler that was registered for the type of the given message in the given h...
Definition: mq.c:229
static struct GNUNET_IDENTITY_Handle * id
Handle to identity service.
GNUNET_MQ_PriorityPreferences
Per envelope preferences and priorities.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
const struct GNUNET_MessageHeader * GNUNET_MQ_impl_current(struct GNUNET_MQ_Handle *mq)
Get the message that should currently be sent.
Definition: mq.c:615
int GNUNET_CONTAINER_multihashmap32_put(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
const struct GNUNET_MQ_Envelope * GNUNET_MQ_env_next(const struct GNUNET_MQ_Envelope *env)
Return next envelope in queue.
Definition: mq.c:1110
int GNUNET_CONTAINER_multihashmap32_remove_all(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key)
Remove all entries for the given key from the map.
struct GNUNET_MQ_Envelope * envelope_tail
Linked list of messages pending to be sent.
Definition: mq.c:136
void(* GNUNET_SCHEDULER_TaskCallback)(void *cls)
Signature of the main function of a task.
GNUNET_MQ_SendImpl send_impl
Actual implementation of message sending, called when a message is added.
Definition: mq.c:96
Flag to indicate that low latency is important.
enum GNUNET_MQ_PriorityPreferences GNUNET_MQ_env_get_options(struct GNUNET_MQ_Envelope *env)
Get application-specific options for this envelope.
Definition: mq.c:1037
void GNUNET_MQ_set_handlers_closure(struct GNUNET_MQ_Handle *mq, void *handlers_cls)
Change the closure argument in all of the handlers of the mq.
Definition: mq.c:596
void GNUNET_MQ_impl_send_in_flight(struct GNUNET_MQ_Handle *mq)
Call the send notification for the current message, but do not try to send the next message until GNU...
Definition: mq.c:532
int in_flight
GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
Definition: mq.c:187
uint16_t type
Type of the message this handler covers, in host byte order.
Flag to indicate that high bandwidth is desired.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
GNUNET_MQ_ErrorHandler error_handler
Callback will be called when an error occurs.
Definition: mq.c:116
GNUNET_MQ_PreferenceKind
Enum defining all known preference categories.
struct GNUNET_MQ_Handle * GNUNET_MQ_queue_for_callbacks(GNUNET_MQ_SendImpl send, GNUNET_MQ_DestroyImpl destroy, GNUNET_MQ_CancelImpl cancel, void *impl_state, const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_ErrorHandler error_handler, void *error_handler_cls)
Create a message queue for the specified handlers.
Definition: mq.c:565
void * cls
Closure for mv and cb.
static struct GNUNET_CADET_MessageHandler handlers[]
Handlers, for diverse services.
GNUNET_SCHEDULER_TaskCallback sent_cb
Called after the message was sent irrevocably.
Definition: mq.c:61
void GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Call a callback once the envelope has been sent, that is, sending it can not be canceled anymore...
Definition: mq.c:787
GNUNET_MQ_MessageValidationCallback mv
Callback to validate a message of the specified type.
struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_nested_mh_(struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, const struct GNUNET_MessageHeader *nested_mh)
Implementation of the GNUNET_MQ_msg_nested_mh macro.
Definition: mq.c:688
void * GNUNET_CONTAINER_multihashmap32_get(const struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key)
Given a key find a value in the map matching the key.
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
unsigned int GNUNET_MQ_get_length(struct GNUNET_MQ_Handle *mq)
Obtain the current length of the message queue.
Definition: mq.c:337
GNUNET_SCHEDULER_TaskCallback cb
Function to call.
Definition: mq.c:822
#define GNUNET_MAX(a, b)
Definition: gnunet_common.h:95
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_now(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run as soon as possible.
Definition: scheduler.c:1296
GNUNET_MQ_DestroyImpl destroy_impl
Implementation-dependent queue destruction function.
Definition: mq.c:101
#define GNUNET_new_array(n, type)
Allocate a size n array with structs or unions of the given type.
The preferred transmission for this envelope focuses on maximizing bandwidth.
void * GNUNET_MQ_impl_state(struct GNUNET_MQ_Handle *mq)
Get the implementation state associated with the message queue.
Definition: mq.c:638
void(* GNUNET_MQ_DestroyImpl)(struct GNUNET_MQ_Handle *mq, void *impl_state)
Signature of functions implementing the destruction of a message queue.
GNUNET_MQ_MessageCallback cb
Callback, called every time a new message of the specified type has been received.
void GNUNET_MQ_inject_message(struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh)
Call the message message handler that was registered for the type of the given message in the given m...
Definition: mq.c:202
We received a message that was malformed and thus could not be passed to its handler.
enum GNUNET_MQ_PriorityPreferences priority
Flags that were set for this queue by GNUNET_MQ_set_options().
Definition: mq.c:164
struct GNUNET_MQ_MessageHandler * GNUNET_MQ_copy_handlers2(const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_MessageCallback agpl_handler, void *agpl_cls)
Copy an array of handlers, appending AGPL handler.
Definition: mq.c:1261
Internal representation of the hash map.
struct GNUNET_SCHEDULER_Task * send_task
Task to asynchronously run impl_send_continue().
Definition: mq.c:126
struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_(struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
Create a new envelope.
Definition: mq.c:645
struct GNUNET_MQ_DestroyNotificationHandle * dnh_head
Functions to call on queue destruction; kept in a DLL.
Definition: mq.c:153
Message handler for a specific message type.
struct GNUNET_CONTAINER_MultiHashMap32 * GNUNET_CONTAINER_multihashmap32_create(unsigned int len)
Create a 32-bit key multi hash map.
struct GNUNET_MQ_Envelope * prev
Messages are stored in a linked list Each queue has its own list of envelopes.
Definition: mq.c:44
void GNUNET_MQ_dll_remove(struct GNUNET_MQ_Envelope **env_head, struct GNUNET_MQ_Envelope **env_tail, struct GNUNET_MQ_Envelope *env)
Remove env from the envelope DLL starting at env_head.
Definition: mq.c:1212
unsigned int GNUNET_MQ_count_handlers(const struct GNUNET_MQ_MessageHandler *handlers)
Count the handlers in a handler array.
Definition: mq.c:1291
void GNUNET_CONTAINER_multihashmap32_destroy(struct GNUNET_CONTAINER_MultiHashMap32 *map)
Destroy a 32-bit key hash map.
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *ev)
Discard the message queue message, free all allocated resources.
Definition: mq.c:323
void * sent_cls
Closure for send_cb.
Definition: mq.c:66
There must only be one value per key; storing a value should fail if a value under the same key alrea...
GNUNET_MQ_CancelImpl cancel_impl
Implementation-dependent send cancel function.
Definition: mq.c:106
static unsigned int size
Size of the "table".
Definition: peer.c:67
struct GNUNET_MQ_Handle * mq
Queue to notify about.
Definition: mq.c:817
#define LOG(kind,...)
Definition: mq.c:29
struct GNUNET_MQ_Envelope * next
Messages are stored in a linked list.
Definition: mq.c:38
void * error_handler_cls
Closure for the error handler.
Definition: mq.c:121
uint32_t assoc_id
Next id that should be used for the assoc_map, initialized lazily to a random value together with ass...
Definition: mq.c:171
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
#define GNUNET_CONTAINER_DLL_insert_tail(head, tail, element)
Insert an element at the tail of a DLL.
void(* GNUNET_MQ_CancelImpl)(struct GNUNET_MQ_Handle *mq, void *impl_state)
Implementation function that cancels the currently sent message.
static void impl_send_continue(void *cls)
Task run to call the send implementation for the next queued message, if any.
Definition: mq.c:467
Bit mask to apply to extract the priority bits.
void(* GNUNET_MQ_MessageCallback)(void *cls, const struct GNUNET_MessageHeader *msg)
Called when a message has been received.
static void error_handler(void *cls, enum GNUNET_MQ_Error error)
We encountered an error handling the MQ to the ATS service.
Handle to a message queue.
Definition: mq.c:85
struct GNUNET_MQ_Handle * parent_queue
Queue the message is queued in, NULL if message is not queued.
Definition: mq.c:56
void GNUNET_MQ_set_options(struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_PriorityPreferences pp)
Set application-specific default options for this queue.
Definition: mq.c:1083
struct GNUNET_CONTAINER_MultiHashMap32 * assoc_map
Map of associations, lazily allocated.
Definition: mq.c:148
const char * GNUNET_MQ_preference_to_string(enum GNUNET_MQ_PreferenceKind type)
Convert an enum GNUNET_MQ_PreferenceType to a string.
Definition: mq.c:1312
enum GNUNET_MQ_PriorityPreferences priority
Flags that were set for this envelope by GNUNET_MQ_env_set_options().
Definition: mq.c:73
void * GNUNET_MQ_assoc_remove(struct GNUNET_MQ_Handle *mq, uint32_t request_id)
Remove the association for a request_id.
Definition: mq.c:765
struct GNUNET_MQ_DestroyNotificationHandle * next
Kept in a DLL.
Definition: mq.c:812
struct GNUNET_MQ_Envelope * current_envelope
Message that is currently scheduled to be sent.
Definition: mq.c:143
void(* GNUNET_MQ_SendImpl)(struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *msg, void *impl_state)
Signature of functions implementing the sending functionality of a message queue. ...
void GNUNET_MQ_send_copy(struct GNUNET_MQ_Handle *mq, const struct GNUNET_MQ_Envelope *ev)
Send a copy of a message with the given message queue.
Definition: mq.c:443
The preferred transmission for this envelope foces on minimizing latency.
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
#define GNUNET_log(kind,...)
Entry in list of pending tasks.
Definition: scheduler.c:134
Flag to indicate that out-of-order delivery is OK.
void GNUNET_MQ_destroy_notify_cancel(struct GNUNET_MQ_DestroyNotificationHandle *dnh)
Cancel registration from GNUNET_MQ_destroy_notify().
Definition: mq.c:1147
const struct GNUNET_MessageHeader * GNUNET_MQ_extract_nested_mh_(const struct GNUNET_MessageHeader *mh, uint16_t base_size)
Implementation of the #GNUNET_MQ_extract_nexted_mh macro.
Definition: mq.c:894
Flag to indicate that CORKing is acceptable.
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
Header for all communications.
struct GNUNET_MQ_Envelope * GNUNET_MQ_get_current_envelope(struct GNUNET_MQ_Handle *mq)
Function to obtain the current envelope from within GNUNET_MQ_SendImpl implementations.
Definition: mq.c:991
void GNUNET_MQ_destroy(struct GNUNET_MQ_Handle *mq)
Destroy the message queue.
Definition: mq.c:837
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:355
Flag to indicate that unreliable delivery is acceptable.
struct GNUNET_MQ_Envelope * GNUNET_MQ_unsent_head(struct GNUNET_MQ_Handle *mq)
Remove the first envelope that has not yet been sent from the message queue and return it...
Definition: mq.c:405
static void destroy(void *cls)
struct GNUNET_MQ_Envelope * GNUNET_MQ_env_copy(struct GNUNET_MQ_Envelope *env)
Function to copy an envelope.
Definition: mq.c:425
void GNUNET_MQ_impl_send_continue(struct GNUNET_MQ_Handle *mq)
Call the send implementation for the next queued message, if any.
Definition: mq.c:499
void GNUNET_MQ_send_cancel(struct GNUNET_MQ_Envelope *ev)
Cancel sending the message.
Definition: mq.c:929
struct GNUNET_MQ_DestroyNotificationHandle * prev
Kept in a DLL.
Definition: mq.c:807
struct GNUNET_MQ_Envelope * envelope_head
Linked list of messages pending to be sent.
Definition: mq.c:131
void * cb_cls
Closure for cb.
Definition: mq.c:827
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_free(ptr)
Wrapper around free.
void * GNUNET_MQ_assoc_get(struct GNUNET_MQ_Handle *mq, uint32_t request_id)
Get the data associated with a request_id in a queue.
Definition: mq.c:749
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:972
The preferred transmission for this envelope foces on reliability.