GNUnet  0.17.5
mq.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2012-2019 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 
29 #define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__)
30 
31 
33 {
39 
45 
52 
57 
62 
66  void *sent_cls;
67 
74 
79 };
80 
81 
86 {
91 
97 
102 
107 
111  void *impl_state;
112 
117 
122 
127 
132 
137 
144 
149 
154 
159 
165 
171  uint32_t assoc_id;
172 
176  unsigned int queue_length;
177 
181  bool in_flight;
182 };
183 
184 
185 void
187  const struct GNUNET_MessageHeader *mh)
188 {
190 
192  mh);
193  if (GNUNET_SYSERR == ret)
194  {
197  return;
198  }
199 }
200 
201 
204  const struct GNUNET_MessageHeader *mh)
205 {
206  bool handled = false;
207  uint16_t msize = ntohs (mh->size);
208  uint16_t mtype = ntohs (mh->type);
209 
211  "Received message of type %u and size %u\n",
212  mtype,
213  msize);
214  if (NULL == handlers)
215  goto done;
216  for (const struct GNUNET_MQ_MessageHandler *handler = handlers;
217  NULL != handler->cb;
218  handler++)
219  {
220  if (handler->type == mtype)
221  {
222  handled = true;
223  if ( (handler->expected_size > msize) ||
224  ( (handler->expected_size != msize) &&
225  (NULL == handler->mv) ) )
226  {
227  /* Too small, or not an exact size and
228  no 'mv' handler to check rest */
230  "Received malformed message of type %u\n",
231  (unsigned int) handler->type);
232  return GNUNET_SYSERR;
233  }
234  if ( (NULL == handler->mv) ||
235  (GNUNET_OK ==
236  handler->mv (handler->cls,
237  mh)) )
238  {
239  /* message well-formed, pass to handler */
240  handler->cb (handler->cls, mh);
241  }
242  else
243  {
244  /* Message rejected by check routine */
246  "Received malformed message of type %u\n",
247  (unsigned int) handler->type);
248  return GNUNET_SYSERR;
249  }
250  break;
251  }
252  }
253 done:
254  if (! handled)
255  {
257  "No handler for message of type %u and size %u\n",
258  mtype,
259  msize);
260  return GNUNET_NO;
261  }
262  return GNUNET_OK;
263 }
264 
265 
266 void
268  enum GNUNET_MQ_Error error)
269 {
270  if (NULL == mq->error_handler)
271  {
273  "Got error %d, but no handler installed\n",
274  (int) error);
275  return;
276  }
278  error);
279 }
280 
281 
282 void
284 {
285  GNUNET_assert (NULL == ev->parent_queue);
286  GNUNET_free (ev);
287 }
288 
289 
290 unsigned int
292 {
293  if (! mq->in_flight)
294  {
295  return mq->queue_length;
296  }
297  return mq->queue_length - 1;
298 }
299 
300 
301 void
303  struct GNUNET_MQ_Envelope *ev)
304 {
305  if (NULL == mq)
307  "mq is NUll when sending message of type %u\n",
308  (unsigned int) ntohs (ev->mh->type));
309  GNUNET_assert (NULL != mq);
310  GNUNET_assert (NULL == ev->parent_queue);
311 
312  mq->queue_length++;
313  if (mq->queue_length >= 10000000)
314  {
315  /* This would seem like a bug... */
317  "MQ with %u entries extended by message of type %u (FC broken?)\n",
318  (unsigned int) mq->queue_length,
319  (unsigned int) ntohs (ev->mh->type));
320  }
321  ev->parent_queue = mq;
322  /* is the implementation busy? queue it! */
323  if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
324  {
326  mq->envelope_tail,
327  ev);
328  return;
329  }
330  GNUNET_assert (NULL == mq->envelope_head);
331  mq->current_envelope = ev;
332 
334  "sending message of type %u and size %u, queue empty (MQ: %p)\n",
335  ntohs (ev->mh->type),
336  ntohs (ev->mh->size),
337  mq);
338 
339  mq->send_impl (mq,
340  ev->mh,
341  mq->impl_state);
342 }
343 
344 
345 struct GNUNET_MQ_Envelope *
347 {
348  struct GNUNET_MQ_Envelope *env;
349 
350  env = mq->envelope_head;
352  mq->envelope_tail,
353  env);
354  mq->queue_length--;
355  env->parent_queue = NULL;
356  return env;
357 }
358 
359 
360 struct GNUNET_MQ_Envelope *
362 {
363  GNUNET_assert (NULL == env->next);
364  GNUNET_assert (NULL == env->parent_queue);
365  GNUNET_assert (NULL == env->sent_cb);
367  return GNUNET_MQ_msg_copy (env->mh);
368 }
369 
370 
371 void
373  const struct GNUNET_MQ_Envelope *ev)
374 {
375  struct GNUNET_MQ_Envelope *env;
376  uint16_t msize;
377 
378  msize = ntohs (ev->mh->size);
379  env = GNUNET_malloc (sizeof(struct GNUNET_MQ_Envelope) + msize);
380  env->mh = (struct GNUNET_MessageHeader *) &env[1];
381  env->sent_cb = ev->sent_cb;
382  env->sent_cls = ev->sent_cls;
383  GNUNET_memcpy (&env[1], ev->mh, msize);
384  GNUNET_MQ_send (mq, env);
385 }
386 
387 
395 static void
397 {
398  struct GNUNET_MQ_Handle *mq = cls;
399 
400  mq->send_task = NULL;
401  /* call is only valid if we're actually currently sending
402  * a message */
403  if (NULL == mq->envelope_head)
404  return;
407  mq->envelope_tail,
409 
411  "sending message of type %u and size %u from queue (MQ: %p)\n",
412  ntohs (mq->current_envelope->mh->type),
413  ntohs (mq->current_envelope->mh->size),
414  mq);
415 
416  mq->send_impl (mq,
418  mq->impl_state);
419 }
420 
421 
422 void
424 {
425  struct GNUNET_MQ_Envelope *current_envelope;
427 
429  mq->queue_length--;
430  mq->in_flight = false;
431  current_envelope = mq->current_envelope;
432  current_envelope->parent_queue = NULL;
433  mq->current_envelope = NULL;
434  GNUNET_assert (NULL == mq->send_task);
436  if (NULL != (cb = current_envelope->sent_cb))
437  {
438  current_envelope->sent_cb = NULL;
439  cb (current_envelope->sent_cls);
440  }
441  GNUNET_free (current_envelope);
442 }
443 
444 
445 void
447 {
448  struct GNUNET_MQ_Envelope *current_envelope;
450 
451  mq->in_flight = true;
452  /* call is only valid if we're actually currently sending
453  * a message */
454  current_envelope = mq->current_envelope;
455  GNUNET_assert (NULL != current_envelope);
456  /* can't call cancel from now on anymore */
457  current_envelope->parent_queue = NULL;
458  if (NULL != (cb = current_envelope->sent_cb))
459  {
460  current_envelope->sent_cb = NULL;
461  cb (current_envelope->sent_cls);
462  }
463 }
464 
465 
466 struct GNUNET_MQ_Handle *
469  GNUNET_MQ_CancelImpl cancel,
470  void *impl_state,
471  const struct GNUNET_MQ_MessageHandler *handlers,
473  void *error_handler_cls)
474 {
475  struct GNUNET_MQ_Handle *mq;
476 
477  mq = GNUNET_new (struct GNUNET_MQ_Handle);
478  mq->send_impl = send;
480  mq->cancel_impl = cancel;
485 
486  return mq;
487 }
488 
489 
490 void
492  void *handlers_cls)
493 {
494  if (NULL == mq->handlers)
495  return;
496  for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
497  mq->handlers[i].cls = handlers_cls;
498 }
499 
500 
501 const struct GNUNET_MessageHeader *
503 {
504  GNUNET_assert (NULL != mq->current_envelope);
505  GNUNET_assert (NULL != mq->current_envelope->mh);
506  return mq->current_envelope->mh;
507 }
508 
509 
510 void *
512 {
513  return mq->impl_state;
514 }
515 
516 
517 struct GNUNET_MQ_Envelope *
519  uint16_t size,
520  uint16_t type)
521 {
522  struct GNUNET_MQ_Envelope *ev;
523 
524  ev = GNUNET_malloc (size + sizeof(struct GNUNET_MQ_Envelope));
525  ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
526  ev->mh->size = htons (size);
527  ev->mh->type = htons (type);
528  if (NULL != mhp)
529  *mhp = ev->mh;
530  return ev;
531 }
532 
533 
534 struct GNUNET_MQ_Envelope *
536 {
537  struct GNUNET_MQ_Envelope *mqm;
538  uint16_t size = ntohs (hdr->size);
539 
540  mqm = GNUNET_malloc (sizeof(*mqm) + size);
541  mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
542  GNUNET_memcpy (mqm->mh,
543  hdr,
544  size);
545  return mqm;
546 }
547 
548 
549 struct GNUNET_MQ_Envelope *
551  uint16_t base_size,
552  uint16_t type,
553  const struct GNUNET_MessageHeader *nested_mh)
554 {
555  struct GNUNET_MQ_Envelope *mqm;
556  uint16_t size;
557 
558  if (NULL == nested_mh)
559  return GNUNET_MQ_msg_ (mhp,
560  base_size,
561  type);
562  size = base_size + ntohs (nested_mh->size);
563  /* check for uint16_t overflow */
564  if (size < base_size)
565  return NULL;
566  mqm = GNUNET_MQ_msg_ (mhp,
567  size,
568  type);
569  GNUNET_memcpy ((char *) mqm->mh + base_size,
570  nested_mh,
571  ntohs (nested_mh->size));
572  return mqm;
573 }
574 
575 
576 uint32_t
578  void *assoc_data)
579 {
580  uint32_t id;
581 
582  if (NULL == mq->assoc_map)
583  {
585  mq->assoc_id = 1;
586  }
587  id = mq->assoc_id++;
590  mq->assoc_map,
591  id,
592  assoc_data,
594  return id;
595 }
596 
597 
605 void *
607  uint32_t request_id)
608 {
609  if (NULL == mq->assoc_map)
610  return NULL;
612  request_id);
613 }
614 
615 
623 void *
625  uint32_t request_id)
626 {
627  void *val;
628 
629  if (NULL == mq->assoc_map)
630  return NULL;
632  request_id);
634  request_id);
635  return val;
636 }
637 
638 
639 void
642  void *cb_cls)
643 {
644  /* allow setting *OR* clearing callback */
645  GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb));
646  ev->sent_cb = cb;
647  ev->sent_cls = cb_cls;
648 }
649 
650 
656 {
661 
666 
671 
676 
680  void *cb_cls;
681 };
682 
683 
684 void
686 {
688 
689  if (NULL != mq->destroy_impl)
690  {
692  }
693  if (NULL != mq->send_task)
694  {
696  mq->send_task = NULL;
697  }
698  while (NULL != mq->envelope_head)
699  {
700  struct GNUNET_MQ_Envelope *ev;
701 
702  ev = mq->envelope_head;
703  ev->parent_queue = NULL;
706  mq->queue_length--;
708  "MQ destroy drops message of type %u\n",
709  ntohs (ev->mh->type));
710  GNUNET_MQ_discard (ev);
711  }
712  if (NULL != mq->current_envelope)
713  {
714  /* we can only discard envelopes that
715  * are not queued! */
718  "MQ destroy drops current message of type %u\n",
719  ntohs (mq->current_envelope->mh->type));
721  mq->current_envelope = NULL;
723  mq->queue_length--;
724  }
725  GNUNET_assert (0 == mq->queue_length);
726  while (NULL != (dnh = mq->dnh_head))
727  {
728  dnh->cb (dnh->cb_cls);
730  }
731  if (NULL != mq->assoc_map)
732  {
734  mq->assoc_map = NULL;
735  }
737  GNUNET_free (mq);
738 }
739 
740 
741 const struct GNUNET_MessageHeader *
743  uint16_t base_size)
744 {
745  uint16_t whole_size;
746  uint16_t nested_size;
747  const struct GNUNET_MessageHeader *nested_msg;
748 
749  whole_size = ntohs (mh->size);
750  GNUNET_assert (whole_size >= base_size);
751  nested_size = whole_size - base_size;
752  if (0 == nested_size)
753  return NULL;
754  if (nested_size < sizeof(struct GNUNET_MessageHeader))
755  {
756  GNUNET_break_op (0);
757  return NULL;
758  }
759  nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
760  if (ntohs (nested_msg->size) != nested_size)
761  {
762  GNUNET_break_op (0);
763  return NULL;
764  }
765  return nested_msg;
766 }
767 
768 
769 void
771 {
772  struct GNUNET_MQ_Handle *mq = ev->parent_queue;
773 
774  GNUNET_assert (NULL != mq);
775  GNUNET_assert (NULL != mq->cancel_impl);
777  mq->queue_length--;
778  if (mq->current_envelope == ev)
779  {
780  /* complex case, we already started with transmitting
781  the message using the callbacks. */
783  mq->cancel_impl (mq,
784  mq->impl_state);
785  /* continue sending the next message, if any */
787  if (NULL != mq->current_envelope)
788  {
790  mq->envelope_tail,
793  "sending canceled message of type %u queue\n",
794  ntohs (ev->mh->type));
795  mq->send_impl (mq,
797  mq->impl_state);
798  }
799  }
800  else
801  {
802  /* simple case, message is still waiting in the queue */
804  mq->envelope_tail,
805  ev);
806  }
807  ev->parent_queue = NULL;
808  ev->mh = NULL;
809  /* also frees ev */
810  GNUNET_free (ev);
811 }
812 
813 
814 struct GNUNET_MQ_Envelope *
816 {
817  return mq->current_envelope;
818 }
819 
820 
821 struct GNUNET_MQ_Envelope *
823 {
824  if (NULL != mq->envelope_tail)
825  return mq->envelope_tail;
826 
827  return mq->current_envelope;
828 }
829 
830 
831 void
834 {
835  env->priority = pp;
837 }
838 
839 
842 {
843  struct GNUNET_MQ_Handle *mq = env->parent_queue;
844 
846  return env->priority;
847  if (NULL == mq)
848  return 0;
849  return mq->priority;
850 }
851 
852 
856 {
858 
861  ret |=
863  ret |=
866  ret |=
868  return ret;
869 }
870 
871 
872 void
875 {
876  mq->priority = pp;
877 }
878 
879 
880 const struct GNUNET_MessageHeader *
882 {
883  return env->mh;
884 }
885 
886 
887 const struct GNUNET_MQ_Envelope *
889 {
890  return env->next;
891 }
892 
893 
897  void *cb_cls)
898 {
900 
902  dnh->mq = mq;
903  dnh->cb = cb;
904  dnh->cb_cls = cb_cls;
906  mq->dnh_tail,
907  dnh);
908  return dnh;
909 }
910 
911 
912 void
915 {
916  struct GNUNET_MQ_Handle *mq = dnh->mq;
917 
919  mq->dnh_tail,
920  dnh);
921  GNUNET_free (dnh);
922 }
923 
924 
925 void
927  struct GNUNET_MQ_Envelope **env_tail,
928  struct GNUNET_MQ_Envelope *env)
929 {
930  GNUNET_CONTAINER_DLL_insert (*env_head,
931  *env_tail,
932  env);
933 }
934 
935 
936 void
938  struct GNUNET_MQ_Envelope **env_tail,
939  struct GNUNET_MQ_Envelope *env)
940 {
942  *env_tail,
943  env);
944 }
945 
946 
947 void
949  struct GNUNET_MQ_Envelope **env_tail,
950  struct GNUNET_MQ_Envelope *env)
951 {
952  GNUNET_CONTAINER_DLL_remove (*env_head,
953  *env_tail,
954  env);
955 }
956 
957 
960 {
961  struct GNUNET_MQ_MessageHandler *copy;
962  unsigned int count;
963 
964  if (NULL == handlers)
965  return NULL;
967  copy = GNUNET_new_array (count + 1,
968  struct GNUNET_MQ_MessageHandler);
969  GNUNET_memcpy (copy,
970  handlers,
971  count * sizeof(struct GNUNET_MQ_MessageHandler));
972  return copy;
973 }
974 
975 
978  GNUNET_MQ_MessageCallback agpl_handler,
979  void *agpl_cls)
980 {
981  struct GNUNET_MQ_MessageHandler *copy;
982  unsigned int count;
983 
984  if (NULL == handlers)
985  return NULL;
987  copy = GNUNET_new_array (count + 2,
988  struct GNUNET_MQ_MessageHandler);
989  GNUNET_memcpy (copy,
990  handlers,
991  count * sizeof(struct GNUNET_MQ_MessageHandler));
992  copy[count].mv = NULL;
993  copy[count].cb = agpl_handler;
994  copy[count].cls = agpl_cls;
996  copy[count].expected_size = sizeof(struct GNUNET_MessageHeader);
997  return copy;
998 }
999 
1000 
1001 unsigned int
1003 {
1004  unsigned int i;
1005 
1006  if (NULL == handlers)
1007  return 0;
1008  for (i = 0; NULL != handlers[i].cb; i++)
1009  ;
1010  return i;
1011 }
1012 
1013 
1014 const char *
1016 {
1017  switch (type)
1018  {
1020  return "NONE";
1022  return "BANDWIDTH";
1024  return "LATENCY";
1026  return "RELIABILITY";
1027  }
1028  return NULL;
1029 }
1030 
1031 
1032 /* end of mq.c */
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
static void error_handler(void *cls, enum GNUNET_MQ_Error error)
We encountered an error handling the MQ to the ATS service.
static int ret
Return value of the commandline.
Definition: gnunet-abd.c:81
static void done()
static struct GNUNET_CADET_MessageHandler handlers[]
Handlers, for diverse services.
static struct GNUNET_CADET_Handle * mh
Cadet handle.
Definition: gnunet-cadet.c:92
static void destroy(void *cls)
static struct GNUNET_IDENTITY_Handle * id
Handle to identity service.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_CONTAINER_DLL_insert_tail(head, tail, element)
Insert an element at the tail of a DLL.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap32_put(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
struct GNUNET_CONTAINER_MultiHashMap32 * GNUNET_CONTAINER_multihashmap32_create(unsigned int len)
Create a 32-bit key multi hash map.
void * GNUNET_CONTAINER_multihashmap32_get(const struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key)
Given a key find a value in the map matching the key.
void GNUNET_CONTAINER_multihashmap32_destroy(struct GNUNET_CONTAINER_MultiHashMap32 *map)
Destroy a 32-bit key hash map.
int GNUNET_CONTAINER_multihashmap32_remove_all(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key)
Remove all entries for the given key from the map.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY
There must only be one value per key; storing a value should fail if a value under the same key alrea...
#define GNUNET_log(kind,...)
#define GNUNET_MAX(a, b)
uint16_t expected_size
Expected size of messages of this type.
GNUNET_MQ_MessageCallback cb
Callback, called every time a new message of the specified type has been received.
void * cls
Closure for mv and cb.
GNUNET_MQ_MessageValidationCallback mv
Callback to validate a message of the specified type.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
GNUNET_GenericReturnValue
Named constants for return values.
Definition: gnunet_common.h:96
uint16_t type
Type of the message this handler covers, in host byte order.
@ GNUNET_OK
Definition: gnunet_common.h:99
@ GNUNET_YES
@ GNUNET_NO
Definition: gnunet_common.h:98
@ GNUNET_SYSERR
Definition: gnunet_common.h:97
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_DEBUG
@ GNUNET_ERROR_TYPE_INFO
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_new_array(n, type)
Allocate a size n array with structs or unions of the given type.
#define GNUNET_free(ptr)
Wrapper around free.
void GNUNET_MQ_send_copy(struct GNUNET_MQ_Handle *mq, const struct GNUNET_MQ_Envelope *ev)
Send a copy of a message with the given message queue.
Definition: mq.c:372
struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_(struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
Create a new envelope.
Definition: mq.c:518
const struct GNUNET_MessageHeader * GNUNET_MQ_extract_nested_mh_(const struct GNUNET_MessageHeader *mh, uint16_t base_size)
Implementation of the #GNUNET_MQ_extract_nexted_mh macro.
Definition: mq.c:742
void GNUNET_MQ_set_options(struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_PriorityPreferences pp)
Set application-specific options for this queue.
Definition: mq.c:873
void GNUNET_MQ_send_cancel(struct GNUNET_MQ_Envelope *ev)
Cancel sending the message.
Definition: mq.c:770
void GNUNET_MQ_dll_remove(struct GNUNET_MQ_Envelope **env_head, struct GNUNET_MQ_Envelope **env_tail, struct GNUNET_MQ_Envelope *env)
Remove env from the envelope DLL starting at env_head.
Definition: mq.c:948
void GNUNET_MQ_env_set_options(struct GNUNET_MQ_Envelope *env, enum GNUNET_MQ_PriorityPreferences pp)
Set application-specific options for this envelope.
Definition: mq.c:832
unsigned int GNUNET_MQ_get_length(struct GNUNET_MQ_Handle *mq)
Obtain the current length of the message queue.
Definition: mq.c:291
struct GNUNET_MQ_DestroyNotificationHandle * GNUNET_MQ_destroy_notify(struct GNUNET_MQ_Handle *mq, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Register function to be called whenever mq is being destroyed.
Definition: mq.c:895
const struct GNUNET_MQ_Envelope * GNUNET_MQ_env_next(const struct GNUNET_MQ_Envelope *env)
Return next envelope in queue.
Definition: mq.c:888
GNUNET_MQ_Error
Error codes for the queue.
struct GNUNET_MQ_Envelope * GNUNET_MQ_unsent_head(struct GNUNET_MQ_Handle *mq)
Remove the first envelope that has not yet been sent from the message queue and return it.
Definition: mq.c:346
void GNUNET_MQ_inject_error(struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_Error error)
Call the error handler of a message queue with the given error code.
Definition: mq.c:267
void(* GNUNET_MQ_ErrorHandler)(void *cls, enum GNUNET_MQ_Error error)
Generic error handler, called with the appropriate error code and the same closure specified at the c...
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:302
void(* GNUNET_MQ_DestroyImpl)(struct GNUNET_MQ_Handle *mq, void *impl_state)
Signature of functions implementing the destruction of a message queue.
struct GNUNET_MQ_Envelope * GNUNET_MQ_get_last_envelope(struct GNUNET_MQ_Handle *mq)
Function to obtain the last envelope in the queue.
Definition: mq.c:822
void GNUNET_MQ_dll_insert_head(struct GNUNET_MQ_Envelope **env_head, struct GNUNET_MQ_Envelope **env_tail, struct GNUNET_MQ_Envelope *env)
Insert env into the envelope DLL starting at env_head Note that env must not be in any MQ while this ...
Definition: mq.c:926
void * GNUNET_MQ_impl_state(struct GNUNET_MQ_Handle *mq)
Get the implementation state associated with the message queue.
Definition: mq.c:511
const char * GNUNET_MQ_preference_to_string(enum GNUNET_MQ_PreferenceKind type)
Convert an enum GNUNET_MQ_PreferenceType to a string.
Definition: mq.c:1015
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *ev)
Discard the message queue message, free all allocated resources.
Definition: mq.c:283
struct GNUNET_MQ_Handle * GNUNET_MQ_queue_for_callbacks(GNUNET_MQ_SendImpl send, GNUNET_MQ_DestroyImpl destroy, GNUNET_MQ_CancelImpl cancel, void *impl_state, const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_ErrorHandler error_handler, void *error_handler_cls)
Create a message queue for the specified handlers.
Definition: mq.c:467
GNUNET_MQ_PreferenceKind
Enum defining all known preference categories.
void GNUNET_MQ_impl_send_continue(struct GNUNET_MQ_Handle *mq)
Call the send implementation for the next queued message, if any.
Definition: mq.c:423
void * GNUNET_MQ_assoc_remove(struct GNUNET_MQ_Handle *mq, uint32_t request_id)
Remove the association for a request_id.
Definition: mq.c:624
enum GNUNET_MQ_PriorityPreferences GNUNET_MQ_env_combine_options(enum GNUNET_MQ_PriorityPreferences p1, enum GNUNET_MQ_PriorityPreferences p2)
Combine performance preferences set for different envelopes that are being combined into one larger e...
Definition: mq.c:854
void GNUNET_MQ_inject_message(struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh)
Call the message message handler that was registered for the type of the given message in the given m...
Definition: mq.c:186
GNUNET_MQ_PriorityPreferences
Per envelope preferences and priorities.
void GNUNET_MQ_destroy_notify_cancel(struct GNUNET_MQ_DestroyNotificationHandle *dnh)
Cancel registration from GNUNET_MQ_destroy_notify().
Definition: mq.c:913
void GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Call a callback once the envelope has been sent, that is, sending it can not be canceled anymore.
Definition: mq.c:640
struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_copy(const struct GNUNET_MessageHeader *hdr)
Create a new envelope by copying an existing message.
Definition: mq.c:535
uint32_t GNUNET_MQ_assoc_add(struct GNUNET_MQ_Handle *mq, void *assoc_data)
Associate the assoc_data in mq with a unique request id.
Definition: mq.c:577
void(* GNUNET_MQ_MessageCallback)(void *cls, const struct GNUNET_MessageHeader *msg)
Called when a message has been received.
enum GNUNET_GenericReturnValue GNUNET_MQ_handle_message(const struct GNUNET_MQ_MessageHandler *handlers, const struct GNUNET_MessageHeader *mh)
Call the message message handler that was registered for the type of the given message in the given h...
Definition: mq.c:203
void * GNUNET_MQ_assoc_get(struct GNUNET_MQ_Handle *mq, uint32_t request_id)
Get the data associated with a request_id in a queue.
Definition: mq.c:606
const struct GNUNET_MessageHeader * GNUNET_MQ_impl_current(struct GNUNET_MQ_Handle *mq)
Get the message that should currently be sent.
Definition: mq.c:502
enum GNUNET_MQ_PriorityPreferences GNUNET_MQ_env_get_options(struct GNUNET_MQ_Envelope *env)
Get performance preferences set for this envelope.
Definition: mq.c:841
void GNUNET_MQ_impl_send_in_flight(struct GNUNET_MQ_Handle *mq)
Call the send notification for the current message, but do not try to send the next message until #gn...
Definition: mq.c:446
struct GNUNET_MQ_Envelope * GNUNET_MQ_env_copy(struct GNUNET_MQ_Envelope *env)
Function to copy an envelope.
Definition: mq.c:361
void GNUNET_MQ_set_handlers_closure(struct GNUNET_MQ_Handle *mq, void *handlers_cls)
Change the closure argument in all of the handlers of the mq.
Definition: mq.c:491
struct GNUNET_MQ_MessageHandler * GNUNET_MQ_copy_handlers2(const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_MessageCallback agpl_handler, void *agpl_cls)
Copy an array of handlers, appending AGPL handler.
Definition: mq.c:977
void GNUNET_MQ_dll_insert_tail(struct GNUNET_MQ_Envelope **env_head, struct GNUNET_MQ_Envelope **env_tail, struct GNUNET_MQ_Envelope *env)
Insert env into the envelope DLL starting at env_head Note that env must not be in any MQ while this ...
Definition: mq.c:937
void(* GNUNET_MQ_CancelImpl)(struct GNUNET_MQ_Handle *mq, void *impl_state)
Implementation function that cancels the currently sent message.
void(* GNUNET_MQ_SendImpl)(struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *msg, void *impl_state)
Signature of functions implementing the sending functionality of a message queue.
struct GNUNET_MQ_MessageHandler * GNUNET_MQ_copy_handlers(const struct GNUNET_MQ_MessageHandler *handlers)
Copy an array of handlers.
Definition: mq.c:959
struct GNUNET_MQ_Envelope * GNUNET_MQ_get_current_envelope(struct GNUNET_MQ_Handle *mq)
Function to obtain the current envelope from within GNUNET_MQ_SendImpl implementations.
Definition: mq.c:815
struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_nested_mh_(struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, const struct GNUNET_MessageHeader *nested_mh)
Implementation of the GNUNET_MQ_msg_nested_mh macro.
Definition: mq.c:550
unsigned int GNUNET_MQ_count_handlers(const struct GNUNET_MQ_MessageHandler *handlers)
Count the handlers in a handler array.
Definition: mq.c:1002
const struct GNUNET_MessageHeader * GNUNET_MQ_env_get_msg(const struct GNUNET_MQ_Envelope *env)
Obtain message contained in envelope.
Definition: mq.c:881
void GNUNET_MQ_destroy(struct GNUNET_MQ_Handle *mq)
Destroy the message queue.
Definition: mq.c:685
@ GNUNET_MQ_ERROR_MALFORMED
We received a message that was malformed and thus could not be passed to its handler.
@ GNUNET_MQ_PREFERENCE_RELIABILITY
The preferred transmission for this envelope foces on reliability.
@ GNUNET_MQ_PREFERENCE_NONE
No preference was expressed.
@ GNUNET_MQ_PREFERENCE_LATENCY
The preferred transmission for this envelope foces on minimizing latency.
@ GNUNET_MQ_PREFERENCE_BANDWIDTH
The preferred transmission for this envelope focuses on maximizing bandwidth.
@ GNUNET_MQ_PREF_OUT_OF_ORDER
Flag to indicate that out-of-order delivery is OK.
@ GNUNET_MQ_PRIORITY_MASK
Bit mask to apply to extract the priority bits.
@ GNUNET_MQ_PREF_CORK_ALLOWED
Flag to indicate that CORKing is acceptable.
@ GNUNET_MQ_PREF_UNRELIABLE
Flag to indicate that unreliable delivery is acceptable.
@ GNUNET_MQ_PREF_GOODPUT
Flag to indicate that high bandwidth is desired.
@ GNUNET_MQ_PREF_LOW_LATENCY
Flag to indicate that low latency is important.
#define GNUNET_MESSAGE_TYPE_REQUEST_AGPL
Message to request source code link.
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_now(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run as soon as possible.
Definition: scheduler.c:1281
void(* GNUNET_SCHEDULER_TaskCallback)(void *cls)
Signature of the main function of a task.
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:957
static void impl_send_continue(void *cls)
Task run to call the send implementation for the next queued message, if any.
Definition: mq.c:396
#define LOG(kind,...)
Definition: mq.c:29
static unsigned int size
Size of the "table".
Definition: peer.c:67
Internal representation of the hash map.
Handle we return for callbacks registered to be notified when GNUNET_MQ_destroy() is called on a queu...
Definition: mq.c:656
struct GNUNET_MQ_DestroyNotificationHandle * next
Kept in a DLL.
Definition: mq.c:665
struct GNUNET_MQ_DestroyNotificationHandle * prev
Kept in a DLL.
Definition: mq.c:660
GNUNET_SCHEDULER_TaskCallback cb
Function to call.
Definition: mq.c:675
void * cb_cls
Closure for cb.
Definition: mq.c:680
struct GNUNET_MQ_Handle * mq
Queue to notify about.
Definition: mq.c:670
struct GNUNET_MQ_Handle * parent_queue
Queue the message is queued in, NULL if message is not queued.
Definition: mq.c:56
void * sent_cls
Closure for send_cb.
Definition: mq.c:66
int have_custom_options
Did the application call GNUNET_MQ_env_set_options()?
Definition: mq.c:78
struct GNUNET_MessageHeader * mh
Actual allocated message header.
Definition: mq.c:51
struct GNUNET_MQ_Envelope * next
Messages are stored in a linked list.
Definition: mq.c:38
GNUNET_SCHEDULER_TaskCallback sent_cb
Called after the message was sent irrevocably.
Definition: mq.c:61
struct GNUNET_MQ_Envelope * prev
Messages are stored in a linked list Each queue has its own list of envelopes.
Definition: mq.c:44
enum GNUNET_MQ_PriorityPreferences priority
Flags that were set for this envelope by GNUNET_MQ_env_set_options().
Definition: mq.c:73
Handle to a message queue.
Definition: mq.c:86
GNUNET_MQ_DestroyImpl destroy_impl
Implementation-dependent queue destruction function.
Definition: mq.c:101
enum GNUNET_MQ_PriorityPreferences priority
Flags that were set for this queue by GNUNET_MQ_set_options().
Definition: mq.c:164
uint32_t assoc_id
Next id that should be used for the assoc_map, initialized lazily to a random value together with ass...
Definition: mq.c:171
struct GNUNET_MQ_DestroyNotificationHandle * dnh_head
Functions to call on queue destruction; kept in a DLL.
Definition: mq.c:153
struct GNUNET_MQ_Envelope * envelope_head
Linked list of messages pending to be sent.
Definition: mq.c:131
struct GNUNET_MQ_Envelope * current_envelope
Message that is currently scheduled to be sent.
Definition: mq.c:143
GNUNET_MQ_CancelImpl cancel_impl
Implementation-dependent send cancel function.
Definition: mq.c:106
GNUNET_MQ_ErrorHandler error_handler
Callback will be called when an error occurs.
Definition: mq.c:116
struct GNUNET_MQ_DestroyNotificationHandle * dnh_tail
Functions to call on queue destruction; kept in a DLL.
Definition: mq.c:158
unsigned int queue_length
Number of entries we have in the envelope-DLL.
Definition: mq.c:176
void * impl_state
Implementation-specific state.
Definition: mq.c:111
struct GNUNET_MQ_MessageHandler * handlers
Handlers array, or NULL if the queue should not receive messages.
Definition: mq.c:90
struct GNUNET_SCHEDULER_Task * send_task
Task to asynchronously run impl_send_continue().
Definition: mq.c:126
void * error_handler_cls
Closure for the error handler.
Definition: mq.c:121
GNUNET_MQ_SendImpl send_impl
Actual implementation of message sending, called when a message is added.
Definition: mq.c:96
struct GNUNET_MQ_Envelope * envelope_tail
Linked list of messages pending to be sent.
Definition: mq.c:136
struct GNUNET_CONTAINER_MultiHashMap32 * assoc_map
Map of associations, lazily allocated.
Definition: mq.c:148
bool in_flight
True if GNUNET_MQ_impl_send_in_flight() was called.
Definition: mq.c:181
Message handler for a specific message type.
Header for all communications.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
Entry in list of pending tasks.
Definition: scheduler.c:135
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model