GNUnet  0.19.2
mq.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2012-2019 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 
30 #define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__)
31 
32 
34 {
40 
46 
53 
58 
63 
67  void *sent_cls;
68 
75 
80 };
81 
82 
87 {
92 
98 
103 
108 
112  void *impl_state;
113 
118 
123 
128 
133 
138 
145 
150 
155 
160 
166 
172  uint32_t assoc_id;
173 
177  unsigned int queue_length;
178 
182  bool in_flight;
183 };
184 
185 
186 void
188  const struct GNUNET_MessageHeader *mh)
189 {
191 
193  mh);
194  if (GNUNET_SYSERR == ret)
195  {
196  GNUNET_break_op (0);
199  return;
200  }
201 }
202 
203 
206  const struct GNUNET_MessageHeader *mh)
207 {
208  bool handled = false;
209  uint16_t msize = ntohs (mh->size);
210  uint16_t mtype = ntohs (mh->type);
211 
213  "Received message of type %u and size %u\n",
214  mtype,
215  msize);
216  if (NULL == handlers)
217  goto done;
218  for (const struct GNUNET_MQ_MessageHandler *handler = handlers;
219  NULL != handler->cb;
220  handler++)
221  {
222  if (handler->type == mtype)
223  {
224  handled = true;
225  if ( (handler->expected_size > msize) ||
226  ( (handler->expected_size != msize) &&
227  (NULL == handler->mv) ) )
228  {
229  /* Too small, or not an exact size and
230  no 'mv' handler to check rest */
232  "Received malformed message of type %u\n",
233  (unsigned int) handler->type);
234  return GNUNET_SYSERR;
235  }
236  if ( (NULL == handler->mv) ||
237  (GNUNET_OK ==
238  handler->mv (handler->cls,
239  mh)) )
240  {
241  /* message well-formed, pass to handler */
242  handler->cb (handler->cls, mh);
243  }
244  else
245  {
246  /* Message rejected by check routine */
248  "Received malformed message of type %u\n",
249  (unsigned int) handler->type);
250  return GNUNET_SYSERR;
251  }
252  break;
253  }
254  }
255 done:
256  if (! handled)
257  {
259  "No handler for message of type %u and size %u\n",
260  mtype,
261  msize);
262  return GNUNET_NO;
263  }
264  return GNUNET_OK;
265 }
266 
267 
268 void
270  enum GNUNET_MQ_Error error)
271 {
272  if (NULL == mq->error_handler)
273  {
275  "Got error %d, but no handler installed\n",
276  (int) error);
277  return;
278  }
280  error);
281 }
282 
283 
284 void
286 {
287  GNUNET_assert (NULL == ev->parent_queue);
288  GNUNET_free (ev);
289 }
290 
291 
292 unsigned int
294 {
295  if (! mq->in_flight)
296  {
297  return mq->queue_length;
298  }
299  return mq->queue_length - 1;
300 }
301 
302 
303 void
305  struct GNUNET_MQ_Envelope *ev)
306 {
307  if (NULL == mq)
309  "mq is NUll when sending message of type %u\n",
310  (unsigned int) ntohs (ev->mh->type));
311  GNUNET_assert (NULL != mq);
312  GNUNET_assert (NULL == ev->parent_queue);
313 
314  mq->queue_length++;
315  if (mq->queue_length >= 10000000)
316  {
317  /* This would seem like a bug... */
319  "MQ with %u entries extended by message of type %u (FC broken?)\n",
320  (unsigned int) mq->queue_length,
321  (unsigned int) ntohs (ev->mh->type));
322  }
323  ev->parent_queue = mq;
324  /* is the implementation busy? queue it! */
325  if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
326  {
328  mq->envelope_tail,
329  ev);
330  return;
331  }
332  GNUNET_assert (NULL == mq->envelope_head);
333  mq->current_envelope = ev;
334 
336  "sending message of type %u and size %u, queue empty (MQ: %p)\n",
337  ntohs (ev->mh->type),
338  ntohs (ev->mh->size),
339  mq);
340 
341  mq->send_impl (mq,
342  ev->mh,
343  mq->impl_state);
344 }
345 
346 
347 struct GNUNET_MQ_Envelope *
349 {
350  struct GNUNET_MQ_Envelope *env;
351 
352  env = mq->envelope_head;
354  mq->envelope_tail,
355  env);
356  mq->queue_length--;
357  env->parent_queue = NULL;
358  return env;
359 }
360 
361 
362 struct GNUNET_MQ_Envelope *
364 {
365  GNUNET_assert (NULL == env->next);
366  GNUNET_assert (NULL == env->parent_queue);
367  GNUNET_assert (NULL == env->sent_cb);
369  return GNUNET_MQ_msg_copy (env->mh);
370 }
371 
372 
373 void
375  const struct GNUNET_MQ_Envelope *ev)
376 {
377  struct GNUNET_MQ_Envelope *env;
378  uint16_t msize;
379 
380  msize = ntohs (ev->mh->size);
381  env = GNUNET_malloc (sizeof(struct GNUNET_MQ_Envelope) + msize);
382  env->mh = (struct GNUNET_MessageHeader *) &env[1];
383  env->sent_cb = ev->sent_cb;
384  env->sent_cls = ev->sent_cls;
385  GNUNET_memcpy (&env[1], ev->mh, msize);
386  GNUNET_MQ_send (mq, env);
387 }
388 
389 
397 static void
399 {
400  struct GNUNET_MQ_Handle *mq = cls;
401 
402  mq->send_task = NULL;
403  /* call is only valid if we're actually currently sending
404  * a message */
405  if (NULL == mq->envelope_head)
406  return;
409  mq->envelope_tail,
411 
413  "sending message of type %u and size %u from queue (MQ: %p)\n",
414  ntohs (mq->current_envelope->mh->type),
415  ntohs (mq->current_envelope->mh->size),
416  mq);
417 
418  mq->send_impl (mq,
420  mq->impl_state);
421 }
422 
423 
424 void
426 {
427  struct GNUNET_MQ_Envelope *current_envelope;
429 
431  mq->queue_length--;
432  mq->in_flight = false;
433  current_envelope = mq->current_envelope;
434  current_envelope->parent_queue = NULL;
435  mq->current_envelope = NULL;
436  GNUNET_assert (NULL == mq->send_task);
438  if (NULL != (cb = current_envelope->sent_cb))
439  {
440  current_envelope->sent_cb = NULL;
441  cb (current_envelope->sent_cls);
442  }
443  GNUNET_free (current_envelope);
444 }
445 
446 
447 void
449 {
450  struct GNUNET_MQ_Envelope *current_envelope;
452 
453  mq->in_flight = true;
454  /* call is only valid if we're actually currently sending
455  * a message */
456  current_envelope = mq->current_envelope;
457  GNUNET_assert (NULL != current_envelope);
458  /* can't call cancel from now on anymore */
459  current_envelope->parent_queue = NULL;
460  if (NULL != (cb = current_envelope->sent_cb))
461  {
462  current_envelope->sent_cb = NULL;
463  cb (current_envelope->sent_cls);
464  }
465 }
466 
467 
468 struct GNUNET_MQ_Handle *
471  GNUNET_MQ_CancelImpl cancel,
472  void *impl_state,
473  const struct GNUNET_MQ_MessageHandler *handlers,
475  void *error_handler_cls)
476 {
477  struct GNUNET_MQ_Handle *mq;
478 
479  mq = GNUNET_new (struct GNUNET_MQ_Handle);
480  mq->send_impl = send;
482  mq->cancel_impl = cancel;
487 
488  return mq;
489 }
490 
491 
492 void
494  void *handlers_cls)
495 {
496  if (NULL == mq->handlers)
497  return;
498  for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
499  mq->handlers[i].cls = handlers_cls;
500 }
501 
502 
503 const struct GNUNET_MessageHeader *
505 {
506  GNUNET_assert (NULL != mq->current_envelope);
507  GNUNET_assert (NULL != mq->current_envelope->mh);
508  return mq->current_envelope->mh;
509 }
510 
511 
512 void *
514 {
515  return mq->impl_state;
516 }
517 
518 
519 struct GNUNET_MQ_Envelope *
521  uint16_t size,
522  uint16_t type)
523 {
524  struct GNUNET_MQ_Envelope *ev;
525 
526  ev = GNUNET_malloc (size + sizeof(struct GNUNET_MQ_Envelope));
527  ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
528  ev->mh->size = htons (size);
529  ev->mh->type = htons (type);
530  if (NULL != mhp)
531  *mhp = ev->mh;
532  return ev;
533 }
534 
535 
536 struct GNUNET_MQ_Envelope *
538 {
539  struct GNUNET_MQ_Envelope *mqm;
540  uint16_t size = ntohs (hdr->size);
541 
542  mqm = GNUNET_malloc (sizeof(*mqm) + size);
543  mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
544  GNUNET_memcpy (mqm->mh,
545  hdr,
546  size);
547  return mqm;
548 }
549 
550 
551 struct GNUNET_MQ_Envelope *
553  uint16_t base_size,
554  uint16_t type,
555  const struct GNUNET_MessageHeader *nested_mh)
556 {
557  struct GNUNET_MQ_Envelope *mqm;
558  uint16_t size;
559 
560  if (NULL == nested_mh)
561  return GNUNET_MQ_msg_ (mhp,
562  base_size,
563  type);
564  size = base_size + ntohs (nested_mh->size);
565  /* check for uint16_t overflow */
566  if (size < base_size)
567  return NULL;
568  mqm = GNUNET_MQ_msg_ (mhp,
569  size,
570  type);
571  GNUNET_memcpy ((char *) mqm->mh + base_size,
572  nested_mh,
573  ntohs (nested_mh->size));
574  return mqm;
575 }
576 
577 
578 uint32_t
580  void *assoc_data)
581 {
582  uint32_t id;
583 
584  if (NULL == mq->assoc_map)
585  {
587  mq->assoc_id = 1;
588  }
589  id = mq->assoc_id++;
592  mq->assoc_map,
593  id,
594  assoc_data,
596  return id;
597 }
598 
599 
607 void *
609  uint32_t request_id)
610 {
611  if (NULL == mq->assoc_map)
612  return NULL;
614  request_id);
615 }
616 
617 
625 void *
627  uint32_t request_id)
628 {
629  void *val;
630 
631  if (NULL == mq->assoc_map)
632  return NULL;
634  request_id);
636  request_id);
637  return val;
638 }
639 
640 
641 void
644  void *cb_cls)
645 {
646  /* allow setting *OR* clearing callback */
647  GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb));
648  ev->sent_cb = cb;
649  ev->sent_cls = cb_cls;
650 }
651 
652 
658 {
663 
668 
673 
678 
682  void *cb_cls;
683 };
684 
685 
686 void
688 {
690 
691  if (NULL != mq->destroy_impl)
692  {
694  }
695  if (NULL != mq->send_task)
696  {
698  mq->send_task = NULL;
699  }
700  while (NULL != mq->envelope_head)
701  {
702  struct GNUNET_MQ_Envelope *ev;
703 
704  ev = mq->envelope_head;
705  ev->parent_queue = NULL;
708  mq->queue_length--;
710  "MQ destroy drops message of type %u\n",
711  ntohs (ev->mh->type));
712  GNUNET_MQ_discard (ev);
713  }
714  if (NULL != mq->current_envelope)
715  {
716  /* we can only discard envelopes that
717  * are not queued! */
720  "MQ destroy drops current message of type %u\n",
721  ntohs (mq->current_envelope->mh->type));
723  mq->current_envelope = NULL;
725  mq->queue_length--;
726  }
727  GNUNET_assert (0 == mq->queue_length);
728  while (NULL != (dnh = mq->dnh_head))
729  {
730  dnh->cb (dnh->cb_cls);
732  }
733  if (NULL != mq->assoc_map)
734  {
736  mq->assoc_map = NULL;
737  }
739  GNUNET_free (mq);
740 }
741 
742 
743 const struct GNUNET_MessageHeader *
745  uint16_t base_size)
746 {
747  uint16_t whole_size;
748  uint16_t nested_size;
749  const struct GNUNET_MessageHeader *nested_msg;
750 
751  whole_size = ntohs (mh->size);
752  GNUNET_assert (whole_size >= base_size);
753  nested_size = whole_size - base_size;
754  if (0 == nested_size)
755  return NULL;
756  if (nested_size < sizeof(struct GNUNET_MessageHeader))
757  {
758  GNUNET_break_op (0);
759  return NULL;
760  }
761  nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
762  if (ntohs (nested_msg->size) != nested_size)
763  {
764  GNUNET_break_op (0);
765  return NULL;
766  }
767  return nested_msg;
768 }
769 
770 
771 void
773 {
774  struct GNUNET_MQ_Handle *mq = ev->parent_queue;
775 
776  GNUNET_assert (NULL != mq);
777  GNUNET_assert (NULL != mq->cancel_impl);
779  mq->queue_length--;
780  if (mq->current_envelope == ev)
781  {
782  /* complex case, we already started with transmitting
783  the message using the callbacks. */
785  mq->cancel_impl (mq,
786  mq->impl_state);
787  /* continue sending the next message, if any */
789  if (NULL != mq->current_envelope)
790  {
792  mq->envelope_tail,
795  "sending canceled message of type %u queue\n",
796  ntohs (ev->mh->type));
797  mq->send_impl (mq,
799  mq->impl_state);
800  }
801  }
802  else
803  {
804  /* simple case, message is still waiting in the queue */
806  mq->envelope_tail,
807  ev);
808  }
809  ev->parent_queue = NULL;
810  ev->mh = NULL;
811  /* also frees ev */
812  GNUNET_free (ev);
813 }
814 
815 
816 struct GNUNET_MQ_Envelope *
818 {
819  return mq->current_envelope;
820 }
821 
822 
823 struct GNUNET_MQ_Envelope *
825 {
826  if (NULL != mq->envelope_tail)
827  return mq->envelope_tail;
828 
829  return mq->current_envelope;
830 }
831 
832 
833 void
836 {
837  env->priority = pp;
839 }
840 
841 
844 {
845  struct GNUNET_MQ_Handle *mq = env->parent_queue;
846 
848  return env->priority;
849  if (NULL == mq)
850  return 0;
851  return mq->priority;
852 }
853 
854 
858 {
860 
863  ret |=
865  ret |=
868  ret |=
870  return ret;
871 }
872 
873 
874 void
877 {
878  mq->priority = pp;
879 }
880 
881 
882 const struct GNUNET_MessageHeader *
884 {
885  return env->mh;
886 }
887 
888 
889 const struct GNUNET_MQ_Envelope *
891 {
892  return env->next;
893 }
894 
895 
899  void *cb_cls)
900 {
902 
904  dnh->mq = mq;
905  dnh->cb = cb;
906  dnh->cb_cls = cb_cls;
908  mq->dnh_tail,
909  dnh);
910  return dnh;
911 }
912 
913 
914 void
917 {
918  struct GNUNET_MQ_Handle *mq = dnh->mq;
919 
921  mq->dnh_tail,
922  dnh);
923  GNUNET_free (dnh);
924 }
925 
926 
927 void
929  struct GNUNET_MQ_Envelope **env_tail,
930  struct GNUNET_MQ_Envelope *env)
931 {
932  GNUNET_CONTAINER_DLL_insert (*env_head,
933  *env_tail,
934  env);
935 }
936 
937 
938 void
940  struct GNUNET_MQ_Envelope **env_tail,
941  struct GNUNET_MQ_Envelope *env)
942 {
944  *env_tail,
945  env);
946 }
947 
948 
949 void
951  struct GNUNET_MQ_Envelope **env_tail,
952  struct GNUNET_MQ_Envelope *env)
953 {
954  GNUNET_CONTAINER_DLL_remove (*env_head,
955  *env_tail,
956  env);
957 }
958 
959 
962 {
963  struct GNUNET_MQ_MessageHandler *copy;
964  unsigned int count;
965 
966  if (NULL == handlers)
967  return NULL;
969  copy = GNUNET_new_array (count + 1,
970  struct GNUNET_MQ_MessageHandler);
971  GNUNET_memcpy (copy,
972  handlers,
973  count * sizeof(struct GNUNET_MQ_MessageHandler));
974  return copy;
975 }
976 
977 
980  GNUNET_MQ_MessageCallback agpl_handler,
981  void *agpl_cls)
982 {
983  struct GNUNET_MQ_MessageHandler *copy;
984  unsigned int count;
985 
986  if (NULL == handlers)
987  return NULL;
989  copy = GNUNET_new_array (count + 2,
990  struct GNUNET_MQ_MessageHandler);
991  GNUNET_memcpy (copy,
992  handlers,
993  count * sizeof(struct GNUNET_MQ_MessageHandler));
994  copy[count].mv = NULL;
995  copy[count].cb = agpl_handler;
996  copy[count].cls = agpl_cls;
998  copy[count].expected_size = sizeof(struct GNUNET_MessageHeader);
999  return copy;
1000 }
1001 
1002 
1003 unsigned int
1005 {
1006  unsigned int i;
1007 
1008  if (NULL == handlers)
1009  return 0;
1010  for (i = 0; NULL != handlers[i].cb; i++)
1011  ;
1012  return i;
1013 }
1014 
1015 
1016 const char *
1018 {
1019  switch (type)
1020  {
1022  return "NONE";
1024  return "BANDWIDTH";
1026  return "LATENCY";
1028  return "RELIABILITY";
1029  }
1030  return NULL;
1031 }
1032 
1033 
1034 /* end of mq.c */
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
static void error_handler(void *cls, enum GNUNET_MQ_Error error)
We encountered an error handling the MQ to the ATS service.
static int ret
Return value of the commandline.
Definition: gnunet-abd.c:81
static void done()
static struct GNUNET_CADET_MessageHandler handlers[]
Handlers, for diverse services.
static struct GNUNET_CADET_Handle * mh
Cadet handle.
Definition: gnunet-cadet.c:92
static void destroy(void *cls)
static struct GNUNET_IDENTITY_Handle * id
Handle to identity service.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_CONTAINER_DLL_insert_tail(head, tail, element)
Insert an element at the tail of a DLL.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap32_put(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
struct GNUNET_CONTAINER_MultiHashMap32 * GNUNET_CONTAINER_multihashmap32_create(unsigned int len)
Create a 32-bit key multi hash map.
void * GNUNET_CONTAINER_multihashmap32_get(const struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key)
Given a key find a value in the map matching the key.
void GNUNET_CONTAINER_multihashmap32_destroy(struct GNUNET_CONTAINER_MultiHashMap32 *map)
Destroy a 32-bit key hash map.
int GNUNET_CONTAINER_multihashmap32_remove_all(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key)
Remove all entries for the given key from the map.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY
There must only be one value per key; storing a value should fail if a value under the same key alrea...
#define GNUNET_log(kind,...)
#define GNUNET_MAX(a, b)
uint16_t expected_size
Expected size of messages of this type.
GNUNET_MQ_MessageCallback cb
Callback, called every time a new message of the specified type has been received.
void * cls
Closure for mv and cb.
GNUNET_MQ_MessageValidationCallback mv
Callback to validate a message of the specified type.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
GNUNET_GenericReturnValue
Named constants for return values.
uint16_t type
Type of the message this handler covers, in host byte order.
@ GNUNET_OK
@ GNUNET_YES
@ GNUNET_NO
@ GNUNET_SYSERR
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_DEBUG
@ GNUNET_ERROR_TYPE_INFO
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_new_array(n, type)
Allocate a size n array with structs or unions of the given type.
#define GNUNET_free(ptr)
Wrapper around free.
void GNUNET_MQ_send_copy(struct GNUNET_MQ_Handle *mq, const struct GNUNET_MQ_Envelope *ev)
Send a copy of a message with the given message queue.
Definition: mq.c:374
struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_(struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
Create a new envelope.
Definition: mq.c:520
const struct GNUNET_MessageHeader * GNUNET_MQ_extract_nested_mh_(const struct GNUNET_MessageHeader *mh, uint16_t base_size)
Implementation of the #GNUNET_MQ_extract_nexted_mh macro.
Definition: mq.c:744
void GNUNET_MQ_set_options(struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_PriorityPreferences pp)
Set application-specific options for this queue.
Definition: mq.c:875
void GNUNET_MQ_send_cancel(struct GNUNET_MQ_Envelope *ev)
Cancel sending the message.
Definition: mq.c:772
void GNUNET_MQ_dll_remove(struct GNUNET_MQ_Envelope **env_head, struct GNUNET_MQ_Envelope **env_tail, struct GNUNET_MQ_Envelope *env)
Remove env from the envelope DLL starting at env_head.
Definition: mq.c:950
void GNUNET_MQ_env_set_options(struct GNUNET_MQ_Envelope *env, enum GNUNET_MQ_PriorityPreferences pp)
Set application-specific options for this envelope.
Definition: mq.c:834
unsigned int GNUNET_MQ_get_length(struct GNUNET_MQ_Handle *mq)
Obtain the current length of the message queue.
Definition: mq.c:293
struct GNUNET_MQ_DestroyNotificationHandle * GNUNET_MQ_destroy_notify(struct GNUNET_MQ_Handle *mq, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Register function to be called whenever mq is being destroyed.
Definition: mq.c:897
const struct GNUNET_MQ_Envelope * GNUNET_MQ_env_next(const struct GNUNET_MQ_Envelope *env)
Return next envelope in queue.
Definition: mq.c:890
GNUNET_MQ_Error
Error codes for the queue.
struct GNUNET_MQ_Envelope * GNUNET_MQ_unsent_head(struct GNUNET_MQ_Handle *mq)
Remove the first envelope that has not yet been sent from the message queue and return it.
Definition: mq.c:348
void GNUNET_MQ_inject_error(struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_Error error)
Call the error handler of a message queue with the given error code.
Definition: mq.c:269
void(* GNUNET_MQ_ErrorHandler)(void *cls, enum GNUNET_MQ_Error error)
Generic error handler, called with the appropriate error code and the same closure specified at the c...
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:304
void(* GNUNET_MQ_DestroyImpl)(struct GNUNET_MQ_Handle *mq, void *impl_state)
Signature of functions implementing the destruction of a message queue.
struct GNUNET_MQ_Envelope * GNUNET_MQ_get_last_envelope(struct GNUNET_MQ_Handle *mq)
Function to obtain the last envelope in the queue.
Definition: mq.c:824
void GNUNET_MQ_dll_insert_head(struct GNUNET_MQ_Envelope **env_head, struct GNUNET_MQ_Envelope **env_tail, struct GNUNET_MQ_Envelope *env)
Insert env into the envelope DLL starting at env_head Note that env must not be in any MQ while this ...
Definition: mq.c:928
void * GNUNET_MQ_impl_state(struct GNUNET_MQ_Handle *mq)
Get the implementation state associated with the message queue.
Definition: mq.c:513
const char * GNUNET_MQ_preference_to_string(enum GNUNET_MQ_PreferenceKind type)
Convert an enum GNUNET_MQ_PreferenceType to a string.
Definition: mq.c:1017
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *ev)
Discard the message queue message, free all allocated resources.
Definition: mq.c:285
struct GNUNET_MQ_Handle * GNUNET_MQ_queue_for_callbacks(GNUNET_MQ_SendImpl send, GNUNET_MQ_DestroyImpl destroy, GNUNET_MQ_CancelImpl cancel, void *impl_state, const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_ErrorHandler error_handler, void *error_handler_cls)
Create a message queue for the specified handlers.
Definition: mq.c:469
GNUNET_MQ_PreferenceKind
Enum defining all known preference categories.
void GNUNET_MQ_impl_send_continue(struct GNUNET_MQ_Handle *mq)
Call the send implementation for the next queued message, if any.
Definition: mq.c:425
void * GNUNET_MQ_assoc_remove(struct GNUNET_MQ_Handle *mq, uint32_t request_id)
Remove the association for a request_id.
Definition: mq.c:626
enum GNUNET_MQ_PriorityPreferences GNUNET_MQ_env_combine_options(enum GNUNET_MQ_PriorityPreferences p1, enum GNUNET_MQ_PriorityPreferences p2)
Combine performance preferences set for different envelopes that are being combined into one larger e...
Definition: mq.c:856
void GNUNET_MQ_inject_message(struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh)
Call the message message handler that was registered for the type of the given message in the given m...
Definition: mq.c:187
GNUNET_MQ_PriorityPreferences
Per envelope preferences and priorities.
void GNUNET_MQ_destroy_notify_cancel(struct GNUNET_MQ_DestroyNotificationHandle *dnh)
Cancel registration from GNUNET_MQ_destroy_notify().
Definition: mq.c:915
void GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Call a callback once the envelope has been sent, that is, sending it can not be canceled anymore.
Definition: mq.c:642
struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_copy(const struct GNUNET_MessageHeader *hdr)
Create a new envelope by copying an existing message.
Definition: mq.c:537
uint32_t GNUNET_MQ_assoc_add(struct GNUNET_MQ_Handle *mq, void *assoc_data)
Associate the assoc_data in mq with a unique request id.
Definition: mq.c:579
void(* GNUNET_MQ_MessageCallback)(void *cls, const struct GNUNET_MessageHeader *msg)
Called when a message has been received.
enum GNUNET_GenericReturnValue GNUNET_MQ_handle_message(const struct GNUNET_MQ_MessageHandler *handlers, const struct GNUNET_MessageHeader *mh)
Call the message message handler that was registered for the type of the given message in the given h...
Definition: mq.c:205
void * GNUNET_MQ_assoc_get(struct GNUNET_MQ_Handle *mq, uint32_t request_id)
Get the data associated with a request_id in a queue.
Definition: mq.c:608
const struct GNUNET_MessageHeader * GNUNET_MQ_impl_current(struct GNUNET_MQ_Handle *mq)
Get the message that should currently be sent.
Definition: mq.c:504
enum GNUNET_MQ_PriorityPreferences GNUNET_MQ_env_get_options(struct GNUNET_MQ_Envelope *env)
Get performance preferences set for this envelope.
Definition: mq.c:843
void GNUNET_MQ_impl_send_in_flight(struct GNUNET_MQ_Handle *mq)
Call the send notification for the current message, but do not try to send the next message until #gn...
Definition: mq.c:448
struct GNUNET_MQ_Envelope * GNUNET_MQ_env_copy(struct GNUNET_MQ_Envelope *env)
Function to copy an envelope.
Definition: mq.c:363
void GNUNET_MQ_set_handlers_closure(struct GNUNET_MQ_Handle *mq, void *handlers_cls)
Change the closure argument in all of the handlers of the mq.
Definition: mq.c:493
struct GNUNET_MQ_MessageHandler * GNUNET_MQ_copy_handlers2(const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_MessageCallback agpl_handler, void *agpl_cls)
Copy an array of handlers, appending AGPL handler.
Definition: mq.c:979
void GNUNET_MQ_dll_insert_tail(struct GNUNET_MQ_Envelope **env_head, struct GNUNET_MQ_Envelope **env_tail, struct GNUNET_MQ_Envelope *env)
Insert env into the envelope DLL starting at env_head Note that env must not be in any MQ while this ...
Definition: mq.c:939
void(* GNUNET_MQ_CancelImpl)(struct GNUNET_MQ_Handle *mq, void *impl_state)
Implementation function that cancels the currently sent message.
void(* GNUNET_MQ_SendImpl)(struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *msg, void *impl_state)
Signature of functions implementing the sending functionality of a message queue.
struct GNUNET_MQ_MessageHandler * GNUNET_MQ_copy_handlers(const struct GNUNET_MQ_MessageHandler *handlers)
Copy an array of handlers.
Definition: mq.c:961
struct GNUNET_MQ_Envelope * GNUNET_MQ_get_current_envelope(struct GNUNET_MQ_Handle *mq)
Function to obtain the current envelope from within GNUNET_MQ_SendImpl implementations.
Definition: mq.c:817
struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_nested_mh_(struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, const struct GNUNET_MessageHeader *nested_mh)
Implementation of the GNUNET_MQ_msg_nested_mh macro.
Definition: mq.c:552
unsigned int GNUNET_MQ_count_handlers(const struct GNUNET_MQ_MessageHandler *handlers)
Count the handlers in a handler array.
Definition: mq.c:1004
const struct GNUNET_MessageHeader * GNUNET_MQ_env_get_msg(const struct GNUNET_MQ_Envelope *env)
Obtain message contained in envelope.
Definition: mq.c:883
void GNUNET_MQ_destroy(struct GNUNET_MQ_Handle *mq)
Destroy the message queue.
Definition: mq.c:687
@ GNUNET_MQ_ERROR_MALFORMED
We received a message that was malformed and thus could not be passed to its handler.
@ GNUNET_MQ_PREFERENCE_RELIABILITY
The preferred transmission for this envelope foces on reliability.
@ GNUNET_MQ_PREFERENCE_NONE
No preference was expressed.
@ GNUNET_MQ_PREFERENCE_LATENCY
The preferred transmission for this envelope foces on minimizing latency.
@ GNUNET_MQ_PREFERENCE_BANDWIDTH
The preferred transmission for this envelope focuses on maximizing bandwidth.
@ GNUNET_MQ_PREF_OUT_OF_ORDER
Flag to indicate that out-of-order delivery is OK.
@ GNUNET_MQ_PRIORITY_MASK
Bit mask to apply to extract the priority bits.
@ GNUNET_MQ_PREF_CORK_ALLOWED
Flag to indicate that CORKing is acceptable.
@ GNUNET_MQ_PREF_UNRELIABLE
Flag to indicate that unreliable delivery is acceptable.
@ GNUNET_MQ_PREF_GOODPUT
Flag to indicate that high bandwidth is desired.
@ GNUNET_MQ_PREF_LOW_LATENCY
Flag to indicate that low latency is important.
#define GNUNET_MESSAGE_TYPE_REQUEST_AGPL
Message to request source code link.
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_now(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run as soon as possible.
Definition: scheduler.c:1268
void(* GNUNET_SCHEDULER_TaskCallback)(void *cls)
Signature of the main function of a task.
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:944
static void impl_send_continue(void *cls)
Task run to call the send implementation for the next queued message, if any.
Definition: mq.c:398
#define LOG(kind,...)
Definition: mq.c:30
static unsigned int size
Size of the "table".
Definition: peer.c:68
Internal representation of the hash map.
Handle we return for callbacks registered to be notified when GNUNET_MQ_destroy() is called on a queu...
Definition: mq.c:658
struct GNUNET_MQ_DestroyNotificationHandle * next
Kept in a DLL.
Definition: mq.c:667
struct GNUNET_MQ_DestroyNotificationHandle * prev
Kept in a DLL.
Definition: mq.c:662
GNUNET_SCHEDULER_TaskCallback cb
Function to call.
Definition: mq.c:677
void * cb_cls
Closure for cb.
Definition: mq.c:682
struct GNUNET_MQ_Handle * mq
Queue to notify about.
Definition: mq.c:672
struct GNUNET_MQ_Handle * parent_queue
Queue the message is queued in, NULL if message is not queued.
Definition: mq.c:57
void * sent_cls
Closure for send_cb.
Definition: mq.c:67
int have_custom_options
Did the application call GNUNET_MQ_env_set_options()?
Definition: mq.c:79
struct GNUNET_MessageHeader * mh
Actual allocated message header.
Definition: mq.c:52
struct GNUNET_MQ_Envelope * next
Messages are stored in a linked list.
Definition: mq.c:39
GNUNET_SCHEDULER_TaskCallback sent_cb
Called after the message was sent irrevocably.
Definition: mq.c:62
struct GNUNET_MQ_Envelope * prev
Messages are stored in a linked list Each queue has its own list of envelopes.
Definition: mq.c:45
enum GNUNET_MQ_PriorityPreferences priority
Flags that were set for this envelope by GNUNET_MQ_env_set_options().
Definition: mq.c:74
Handle to a message queue.
Definition: mq.c:87
GNUNET_MQ_DestroyImpl destroy_impl
Implementation-dependent queue destruction function.
Definition: mq.c:102
enum GNUNET_MQ_PriorityPreferences priority
Flags that were set for this queue by GNUNET_MQ_set_options().
Definition: mq.c:165
uint32_t assoc_id
Next id that should be used for the assoc_map, initialized lazily to a random value together with ass...
Definition: mq.c:172
struct GNUNET_MQ_DestroyNotificationHandle * dnh_head
Functions to call on queue destruction; kept in a DLL.
Definition: mq.c:154
struct GNUNET_MQ_Envelope * envelope_head
Linked list of messages pending to be sent.
Definition: mq.c:132
struct GNUNET_MQ_Envelope * current_envelope
Message that is currently scheduled to be sent.
Definition: mq.c:144
GNUNET_MQ_CancelImpl cancel_impl
Implementation-dependent send cancel function.
Definition: mq.c:107
GNUNET_MQ_ErrorHandler error_handler
Callback will be called when an error occurs.
Definition: mq.c:117
struct GNUNET_MQ_DestroyNotificationHandle * dnh_tail
Functions to call on queue destruction; kept in a DLL.
Definition: mq.c:159
unsigned int queue_length
Number of entries we have in the envelope-DLL.
Definition: mq.c:177
void * impl_state
Implementation-specific state.
Definition: mq.c:112
struct GNUNET_MQ_MessageHandler * handlers
Handlers array, or NULL if the queue should not receive messages.
Definition: mq.c:91
struct GNUNET_SCHEDULER_Task * send_task
Task to asynchronously run impl_send_continue().
Definition: mq.c:127
void * error_handler_cls
Closure for the error handler.
Definition: mq.c:122
GNUNET_MQ_SendImpl send_impl
Actual implementation of message sending, called when a message is added.
Definition: mq.c:97
struct GNUNET_MQ_Envelope * envelope_tail
Linked list of messages pending to be sent.
Definition: mq.c:137
struct GNUNET_CONTAINER_MultiHashMap32 * assoc_map
Map of associations, lazily allocated.
Definition: mq.c:149
bool in_flight
True if GNUNET_MQ_impl_send_in_flight() was called.
Definition: mq.c:182
Message handler for a specific message type.
Header for all communications.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
Entry in list of pending tasks.
Definition: scheduler.c:136
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model