GNUnet 0.21.2
mq.c
Go to the documentation of this file.
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2012-2024 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
27#include "platform.h"
28#include "gnunet_util_lib.h"
29
30#define LOG(kind, ...) GNUNET_log_from (kind, "util-mq", __VA_ARGS__)
31
32
34{
40
46
53
58
63
67 void *sent_cls;
68
75
80};
81
82
87{
92
98
103
108
113
118
123
128
133
138
145
150
155
160
166
172 uint32_t assoc_id;
173
177 unsigned int queue_length;
178
183};
184
185
186void
188 const struct GNUNET_MessageHeader *mh)
189{
191
193 mh);
194 if (GNUNET_SYSERR == ret)
195 {
196 GNUNET_break_op (0);
199 return;
200 }
201}
202
203
206 const struct GNUNET_MessageHeader *mh)
207{
208 bool handled = false;
209 uint16_t msize = ntohs (mh->size);
210 uint16_t mtype = ntohs (mh->type);
211
213 "Received message of type %u and size %u\n",
214 mtype,
215 msize);
216 if (NULL == handlers)
217 goto done;
218 for (const struct GNUNET_MQ_MessageHandler *handler = handlers;
219 NULL != handler->cb;
220 handler++)
221 {
222 if (handler->type == mtype)
223 {
224 handled = true;
225 if ( (handler->expected_size > msize) ||
226 ( (handler->expected_size != msize) &&
227 (NULL == handler->mv) ) )
228 {
229 /* Too small, or not an exact size and
230 no 'mv' handler to check rest */
232 "Received malformed message of type %u\n",
233 (unsigned int) handler->type);
234 return GNUNET_SYSERR;
235 }
236 if ( (NULL == handler->mv) ||
237 (GNUNET_OK ==
238 handler->mv (handler->cls,
239 mh)) )
240 {
241 /* message well-formed, pass to handler */
242 handler->cb (handler->cls, mh);
243 }
244 else
245 {
246 /* Message rejected by check routine */
248 "Received malformed message of type %u\n",
249 (unsigned int) handler->type);
250 return GNUNET_SYSERR;
251 }
252 break;
253 }
254 }
255done:
256 if (! handled)
257 {
259 "No handler for message of type %u and size %u\n",
260 mtype,
261 msize);
262 return GNUNET_NO;
263 }
264 return GNUNET_OK;
265}
266
267
268void
270 enum GNUNET_MQ_Error error)
271{
272 if (NULL == mq->error_handler)
273 {
275 "Got error %d, but no handler installed\n",
276 (int) error);
277 return;
278 }
280 error);
281}
282
283
284void
286{
287 GNUNET_assert (NULL == ev->parent_queue);
288 GNUNET_free (ev);
289}
290
291
292unsigned int
294{
295 if (! mq->in_flight)
296 {
297 return mq->queue_length;
298 }
300 return mq->queue_length - 1;
301}
302
303
304void
306 struct GNUNET_MQ_Envelope *ev)
307{
308 GNUNET_assert (NULL != mq);
309 GNUNET_assert (NULL == ev->parent_queue);
310
311 mq->queue_length++;
312 if (mq->queue_length >= 10000000)
313 {
314 /* This would seem like a bug... */
316 "MQ with %u entries extended by message of type %u (FC broken?)\n",
317 (unsigned int) mq->queue_length,
318 (unsigned int) ntohs (ev->mh->type));
319 }
320 ev->parent_queue = mq;
321 /* is the implementation busy? queue it! */
322 if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
323 {
326 ev);
327 return;
328 }
329 else if (NULL != mq->envelope_head)
330 {
333 ev);
334
335 ev = mq->envelope_head;
338 ev);
339 }
340 mq->current_envelope = ev;
341
343 "sending message of type %u and size %u, queue empty (MQ: %p)\n",
344 ntohs (ev->mh->type),
345 ntohs (ev->mh->size),
346 mq);
347
348 mq->send_impl (mq,
349 ev->mh,
350 mq->impl_state);
351}
352
353
354struct GNUNET_MQ_Envelope *
356{
358 GNUNET_assert (NULL != mq->envelope_head);
359 GNUNET_assert (NULL != mq->envelope_tail);
360 struct GNUNET_MQ_Envelope *env;
361
365 env);
366 mq->queue_length--;
367 env->parent_queue = NULL;
368 return env;
369}
370
371
372struct GNUNET_MQ_Envelope *
374{
375 GNUNET_assert (NULL == env->next);
376 GNUNET_assert (NULL == env->parent_queue);
377 GNUNET_assert (NULL == env->sent_cb);
379 return GNUNET_MQ_msg_copy (env->mh);
380}
381
382
383void
385 const struct GNUNET_MQ_Envelope *ev)
386{
387 GNUNET_assert (NULL != ev);
388 struct GNUNET_MQ_Envelope *env;
389 uint16_t msize;
390
391 msize = ntohs (ev->mh->size);
392 env = GNUNET_malloc (sizeof(struct GNUNET_MQ_Envelope) + msize);
393 env->mh = (struct GNUNET_MessageHeader *) &env[1];
394 env->sent_cb = ev->sent_cb;
395 env->sent_cls = ev->sent_cls;
396 GNUNET_memcpy (&env[1], ev->mh, msize);
398}
399
400
408static void
410{
411 struct GNUNET_MQ_Handle *mq = cls;
412 GNUNET_assert (NULL != mq->send_task);
413
414 mq->send_task = NULL;
415 /* call is only valid if we're actually currently sending
416 * a message */
417 if (NULL == mq->envelope_head)
418 return;
423
425 "sending message of type %u and size %u from queue (MQ: %p)\n",
426 ntohs (mq->current_envelope->mh->type),
427 ntohs (mq->current_envelope->mh->size),
428 mq);
429
430 mq->send_impl (mq,
432 mq->impl_state);
433}
434
435
436void
438{
439 struct GNUNET_MQ_Envelope *current_envelope;
441
443 mq->queue_length--;
444 mq->in_flight = false;
445 current_envelope = mq->current_envelope;
446 GNUNET_assert (NULL != current_envelope);
447 current_envelope->parent_queue = NULL;
448 mq->current_envelope = NULL;
449 GNUNET_assert (NULL == mq->send_task);
451 if (NULL != (cb = current_envelope->sent_cb))
452 {
453 current_envelope->sent_cb = NULL;
454 cb (current_envelope->sent_cls);
455 }
456 GNUNET_free (current_envelope);
457}
458
459
460void
462{
463 struct GNUNET_MQ_Envelope *current_envelope;
465
466 mq->in_flight = true;
467 /* call is only valid if we're actually currently sending
468 * a message */
469 current_envelope = mq->current_envelope;
470 GNUNET_assert (NULL != current_envelope);
471 /* can't call cancel from now on anymore */
472 current_envelope->parent_queue = NULL;
473 if (NULL != (cb = current_envelope->sent_cb))
474 {
475 current_envelope->sent_cb = NULL;
476 cb (current_envelope->sent_cls);
477 }
478}
479
480
481struct GNUNET_MQ_Handle *
485 void *impl_state,
488 void *error_handler_cls)
489{
490 struct GNUNET_MQ_Handle *mq;
491
492 mq = GNUNET_new (struct GNUNET_MQ_Handle);
493 mq->send_impl = send;
495 mq->cancel_impl = cancel;
500
501 return mq;
502}
503
504
505void
507 void *handlers_cls)
508{
509 if (NULL == mq->handlers)
510 return;
511 for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
512 mq->handlers[i].cls = handlers_cls;
513}
514
515
516const struct GNUNET_MessageHeader *
518{
521 return mq->current_envelope->mh;
522}
523
524
525void *
527{
528 return mq->impl_state;
529}
530
531
532struct GNUNET_MQ_Envelope *
534 uint16_t size,
535 uint16_t type)
536{
537 struct GNUNET_MQ_Envelope *ev;
538
539 ev = GNUNET_malloc (size + sizeof(struct GNUNET_MQ_Envelope));
540 ev->mh = (struct GNUNET_MessageHeader *) &ev[1];
541 ev->mh->size = htons (size);
542 ev->mh->type = htons (type);
543 if (NULL != mhp)
544 *mhp = ev->mh;
545 return ev;
546}
547
548
549struct GNUNET_MQ_Envelope *
551{
552 struct GNUNET_MQ_Envelope *mqm;
553 uint16_t size = ntohs (hdr->size);
554
555 mqm = GNUNET_malloc (sizeof(*mqm) + size);
556 mqm->mh = (struct GNUNET_MessageHeader *) &mqm[1];
557 GNUNET_memcpy (mqm->mh,
558 hdr,
559 size);
560 return mqm;
561}
562
563
564struct GNUNET_MQ_Envelope *
566 uint16_t base_size,
567 uint16_t type,
568 const struct GNUNET_MessageHeader *nested_mh)
569{
570 struct GNUNET_MQ_Envelope *mqm;
571 uint16_t size;
572
573 if (NULL == nested_mh)
574 return GNUNET_MQ_msg_ (mhp,
575 base_size,
576 type);
577 size = base_size + ntohs (nested_mh->size);
578 /* check for uint16_t overflow */
579 if (size < base_size)
580 return NULL;
581 mqm = GNUNET_MQ_msg_ (mhp,
582 size,
583 type);
584 GNUNET_memcpy ((char *) mqm->mh + base_size,
585 nested_mh,
586 ntohs (nested_mh->size));
587 return mqm;
588}
589
590
591uint32_t
593 void *assoc_data)
594{
595 uint32_t id;
596
597 if (NULL == mq->assoc_map)
598 {
600 mq->assoc_id = 1;
601 }
602 id = mq->assoc_id++;
605 mq->assoc_map,
606 id,
607 assoc_data,
609 return id;
610}
611
612
620void *
622 uint32_t request_id)
623{
624 if (NULL == mq->assoc_map)
625 return NULL;
627 request_id);
628}
629
630
638void *
640 uint32_t request_id)
641{
642 void *val;
643
644 if (NULL == mq->assoc_map)
645 return NULL;
647 request_id);
649 request_id);
650 return val;
651}
652
653
654void
657 void *cb_cls)
658{
659 /* allow setting *OR* clearing callback */
660 GNUNET_assert ((NULL == ev->sent_cb) || (NULL == cb));
661 ev->sent_cb = cb;
662 ev->sent_cls = cb_cls;
663}
664
665
671{
676
681
686
691
695 void *cb_cls;
696};
697
698
699void
701{
703
704 if (NULL != mq->destroy_impl)
705 {
707 }
708 if (NULL != mq->send_task)
709 {
711 mq->send_task = NULL;
712 }
713 while (NULL != mq->envelope_head)
714 {
715 struct GNUNET_MQ_Envelope *ev;
716
717 ev = mq->envelope_head;
718 ev->parent_queue = NULL;
721 mq->queue_length--;
723 "MQ destroy drops message of type %u\n",
724 ntohs (ev->mh->type));
726 }
727 if (NULL != mq->current_envelope)
728 {
729 /* we can only discard envelopes that
730 * are not queued! */
733 "MQ destroy drops current message of type %u\n",
734 ntohs (mq->current_envelope->mh->type));
736 mq->current_envelope = NULL;
738 mq->queue_length--;
739 }
741 while (NULL != (dnh = mq->dnh_head))
742 {
743 dnh->cb (dnh->cb_cls);
745 }
746 if (NULL != mq->assoc_map)
747 {
749 mq->assoc_map = NULL;
750 }
752 GNUNET_free (mq);
753}
754
755
756const struct GNUNET_MessageHeader *
758 uint16_t base_size)
759{
760 uint16_t whole_size;
761 uint16_t nested_size;
762 const struct GNUNET_MessageHeader *nested_msg;
763
764 whole_size = ntohs (mh->size);
765 GNUNET_assert (whole_size >= base_size);
766 nested_size = whole_size - base_size;
767 if (0 == nested_size)
768 return NULL;
769 if (nested_size < sizeof(struct GNUNET_MessageHeader))
770 {
771 GNUNET_break_op (0);
772 return NULL;
773 }
774 nested_msg = (const struct GNUNET_MessageHeader *) ((char *) mh + base_size);
775 if (ntohs (nested_msg->size) != nested_size)
776 {
777 GNUNET_break_op (0);
778 return NULL;
779 }
780 return nested_msg;
781}
782
783
784void
786{
787 struct GNUNET_MQ_Handle *mq = ev->parent_queue;
788
789 GNUNET_assert (NULL != mq);
790 GNUNET_assert (NULL != mq->cancel_impl);
792 mq->queue_length--;
793 if (mq->current_envelope == ev)
794 {
795 /* complex case, we already started with transmitting
796 the message using the callbacks. */
798 mq->cancel_impl (mq,
799 mq->impl_state);
800 /* continue sending the next message, if any */
802 if (NULL != mq->current_envelope)
803 {
808 "sending canceled message of type %u queue\n",
809 ntohs (ev->mh->type));
810 mq->send_impl (mq,
812 mq->impl_state);
813 }
814 }
815 else
816 {
817 /* simple case, message is still waiting in the queue */
820 ev);
821 }
822 ev->parent_queue = NULL;
823 ev->mh = NULL;
824 /* also frees ev */
825 GNUNET_free (ev);
826}
827
828
829struct GNUNET_MQ_Envelope *
831{
832 return mq->current_envelope;
833}
834
835
836struct GNUNET_MQ_Envelope *
838{
839 if (NULL != mq->envelope_tail)
840 return mq->envelope_tail;
841
842 return mq->current_envelope;
843}
844
845
846void
849{
850 env->priority = pp;
852}
853
854
857{
859
861 return env->priority;
862 if (NULL == mq)
863 return 0;
864 return mq->priority;
865}
866
867
871{
873
876 ret |=
878 ret |=
881 ret |=
883 return ret;
884}
885
886
887void
890{
891 mq->priority = pp;
892}
893
894
895const struct GNUNET_MessageHeader *
897{
898 return env->mh;
899}
900
901
902const struct GNUNET_MQ_Envelope *
904{
905 return env->next;
906}
907
908
912 void *cb_cls)
913{
915
917 dnh->mq = mq;
918 dnh->cb = cb;
919 dnh->cb_cls = cb_cls;
921 mq->dnh_tail,
922 dnh);
923 return dnh;
924}
925
926
927void
930{
931 struct GNUNET_MQ_Handle *mq = dnh->mq;
932
934 mq->dnh_tail,
935 dnh);
936 GNUNET_free (dnh);
937}
938
939
940void
942 struct GNUNET_MQ_Envelope **env_tail,
943 struct GNUNET_MQ_Envelope *env)
944{
946 *env_tail,
947 env);
948}
949
950
951void
953 struct GNUNET_MQ_Envelope **env_tail,
954 struct GNUNET_MQ_Envelope *env)
955{
957 *env_tail,
958 env);
959}
960
961
962void
964 struct GNUNET_MQ_Envelope **env_tail,
965 struct GNUNET_MQ_Envelope *env)
966{
968 *env_tail,
969 env);
970}
971
972
975{
976 struct GNUNET_MQ_MessageHandler *copy;
977 unsigned int count;
978
979 if (NULL == handlers)
980 return NULL;
982 copy = GNUNET_new_array (count + 1,
984 GNUNET_memcpy (copy,
985 handlers,
986 count * sizeof(struct GNUNET_MQ_MessageHandler));
987 return copy;
988}
989
990
993 GNUNET_MQ_MessageCallback agpl_handler,
994 void *agpl_cls)
995{
996 struct GNUNET_MQ_MessageHandler *copy;
997 unsigned int count;
998
999 if (NULL == handlers)
1000 return NULL;
1002 copy = GNUNET_new_array (count + 2,
1004 GNUNET_memcpy (copy,
1005 handlers,
1006 count * sizeof(struct GNUNET_MQ_MessageHandler));
1007 copy[count].mv = NULL;
1008 copy[count].cb = agpl_handler;
1009 copy[count].cls = agpl_cls;
1011 copy[count].expected_size = sizeof(struct GNUNET_MessageHeader);
1012 return copy;
1013}
1014
1015
1016unsigned int
1018{
1019 unsigned int i;
1020
1021 if (NULL == handlers)
1022 return 0;
1023 for (i = 0; NULL != handlers[i].cb; i++)
1024 ;
1025 return i;
1026}
1027
1028
1029const char *
1031{
1032 switch (type)
1033 {
1035 return "NONE";
1037 return "BANDWIDTH";
1039 return "LATENCY";
1041 return "RELIABILITY";
1042 }
1043 return NULL;
1044}
1045
1046
1047/* end of mq.c */
struct GNUNET_MQ_MessageHandlers handlers[]
Definition: 003.c:1
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
static void error_handler(void *cls, enum GNUNET_MQ_Error error)
Function called on connection trouble.
static int ret
Final status code.
Definition: gnunet-arm.c:94
static struct GNUNET_CADET_Handle * mh
Cadet handle.
Definition: gnunet-cadet.c:92
static struct GNUNET_IDENTITY_Handle * id
Handle to IDENTITY.
static uint32_t type
Type string converted to DNS type value.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_CONTAINER_DLL_insert_tail(head, tail, element)
Insert an element at the tail of a DLL.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap32_put(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
struct GNUNET_CONTAINER_MultiHashMap32 * GNUNET_CONTAINER_multihashmap32_create(unsigned int len)
Create a 32-bit key multi hash map.
void * GNUNET_CONTAINER_multihashmap32_get(const struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key)
Given a key find a value in the map matching the key.
void GNUNET_CONTAINER_multihashmap32_destroy(struct GNUNET_CONTAINER_MultiHashMap32 *map)
Destroy a 32-bit key hash map.
int GNUNET_CONTAINER_multihashmap32_remove_all(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key)
Remove all entries for the given key from the map.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY
There must only be one value per key; storing a value should fail if a value under the same key alrea...
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
#define GNUNET_log(kind,...)
#define GNUNET_MAX(a, b)
uint16_t expected_size
Expected size of messages of this type.
GNUNET_MQ_MessageCallback cb
Callback, called every time a new message of the specified type has been received.
void * cls
Closure for mv and cb.
GNUNET_MQ_MessageValidationCallback mv
Callback to validate a message of the specified type.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
GNUNET_GenericReturnValue
Named constants for return values.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
uint16_t type
Type of the message this handler covers, in host byte order.
@ GNUNET_OK
@ GNUNET_YES
@ GNUNET_NO
@ GNUNET_SYSERR
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_DEBUG
@ GNUNET_ERROR_TYPE_INFO
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_new_array(n, type)
Allocate a size n array with structs or unions of the given type.
#define GNUNET_free(ptr)
Wrapper around free.
void GNUNET_MQ_send_copy(struct GNUNET_MQ_Handle *mq, const struct GNUNET_MQ_Envelope *ev)
Send a copy of a message with the given message queue.
Definition: mq.c:384
struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_copy(const struct GNUNET_MessageHeader *hdr)
Create a new envelope by copying an existing message.
Definition: mq.c:550
struct GNUNET_MQ_Envelope * GNUNET_MQ_env_copy(struct GNUNET_MQ_Envelope *env)
Function to copy an envelope.
Definition: mq.c:373
void GNUNET_MQ_set_options(struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_PriorityPreferences pp)
Set application-specific options for this queue.
Definition: mq.c:888
void GNUNET_MQ_send_cancel(struct GNUNET_MQ_Envelope *ev)
Cancel sending the message.
Definition: mq.c:785
void GNUNET_MQ_dll_remove(struct GNUNET_MQ_Envelope **env_head, struct GNUNET_MQ_Envelope **env_tail, struct GNUNET_MQ_Envelope *env)
Remove env from the envelope DLL starting at env_head.
Definition: mq.c:963
void GNUNET_MQ_env_set_options(struct GNUNET_MQ_Envelope *env, enum GNUNET_MQ_PriorityPreferences pp)
Set application-specific options for this envelope.
Definition: mq.c:847
unsigned int GNUNET_MQ_get_length(struct GNUNET_MQ_Handle *mq)
Obtain the current length of the message queue.
Definition: mq.c:293
const struct GNUNET_MessageHeader * GNUNET_MQ_extract_nested_mh_(const struct GNUNET_MessageHeader *mh, uint16_t base_size)
Implementation of the #GNUNET_MQ_extract_nexted_mh macro.
Definition: mq.c:757
GNUNET_MQ_Error
Error codes for the queue.
struct GNUNET_MQ_Envelope * GNUNET_MQ_unsent_head(struct GNUNET_MQ_Handle *mq)
Remove the first envelope that has not yet been sent from the message queue and return it.
Definition: mq.c:355
void GNUNET_MQ_inject_error(struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_Error error)
Call the error handler of a message queue with the given error code.
Definition: mq.c:269
struct GNUNET_MQ_Handle * GNUNET_MQ_queue_for_callbacks(GNUNET_MQ_SendImpl send, GNUNET_MQ_DestroyImpl destroy, GNUNET_MQ_CancelImpl cancel, void *impl_state, const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_ErrorHandler error_handler, void *error_handler_cls)
Create a message queue for the specified handlers.
Definition: mq.c:482
void(* GNUNET_MQ_ErrorHandler)(void *cls, enum GNUNET_MQ_Error error)
Generic error handler, called with the appropriate error code and the same closure specified at the c...
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:305
void(* GNUNET_MQ_DestroyImpl)(struct GNUNET_MQ_Handle *mq, void *impl_state)
Signature of functions implementing the destruction of a message queue.
void GNUNET_MQ_dll_insert_head(struct GNUNET_MQ_Envelope **env_head, struct GNUNET_MQ_Envelope **env_tail, struct GNUNET_MQ_Envelope *env)
Insert env into the envelope DLL starting at env_head Note that env must not be in any MQ while this ...
Definition: mq.c:941
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *ev)
Discard the message queue message, free all allocated resources.
Definition: mq.c:285
struct GNUNET_MQ_MessageHandler * GNUNET_MQ_copy_handlers2(const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_MessageCallback agpl_handler, void *agpl_cls)
Copy an array of handlers, appending AGPL handler.
Definition: mq.c:992
struct GNUNET_MQ_MessageHandler * GNUNET_MQ_copy_handlers(const struct GNUNET_MQ_MessageHandler *handlers)
Copy an array of handlers.
Definition: mq.c:974
GNUNET_MQ_PreferenceKind
Enum defining all known preference categories.
void GNUNET_MQ_impl_send_continue(struct GNUNET_MQ_Handle *mq)
Call the send implementation for the next queued message, if any.
Definition: mq.c:437
const char * GNUNET_MQ_preference_to_string(enum GNUNET_MQ_PreferenceKind type)
Convert an enum GNUNET_MQ_PreferenceType to a string.
Definition: mq.c:1030
void * GNUNET_MQ_assoc_get(struct GNUNET_MQ_Handle *mq, uint32_t request_id)
Get the data associated with a request_id in a queue.
Definition: mq.c:621
struct GNUNET_MQ_DestroyNotificationHandle * GNUNET_MQ_destroy_notify(struct GNUNET_MQ_Handle *mq, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Register function to be called whenever mq is being destroyed.
Definition: mq.c:910
enum GNUNET_MQ_PriorityPreferences GNUNET_MQ_env_combine_options(enum GNUNET_MQ_PriorityPreferences p1, enum GNUNET_MQ_PriorityPreferences p2)
Combine performance preferences set for different envelopes that are being combined into one larger e...
Definition: mq.c:869
const struct GNUNET_MQ_Envelope * GNUNET_MQ_env_next(const struct GNUNET_MQ_Envelope *env)
Return next envelope in queue.
Definition: mq.c:903
void GNUNET_MQ_inject_message(struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh)
Call the message message handler that was registered for the type of the given message in the given m...
Definition: mq.c:187
GNUNET_MQ_PriorityPreferences
Per envelope preferences and priorities.
enum GNUNET_GenericReturnValue GNUNET_MQ_handle_message(const struct GNUNET_MQ_MessageHandler *handlers, const struct GNUNET_MessageHeader *mh)
Call the message message handler that was registered for the type of the given message in the given h...
Definition: mq.c:205
void GNUNET_MQ_destroy_notify_cancel(struct GNUNET_MQ_DestroyNotificationHandle *dnh)
Cancel registration from GNUNET_MQ_destroy_notify().
Definition: mq.c:928
void GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Call a callback once the envelope has been sent, that is, sending it can not be canceled anymore.
Definition: mq.c:655
uint32_t GNUNET_MQ_assoc_add(struct GNUNET_MQ_Handle *mq, void *assoc_data)
Associate the assoc_data in mq with a unique request id.
Definition: mq.c:592
void(* GNUNET_MQ_MessageCallback)(void *cls, const struct GNUNET_MessageHeader *msg)
Called when a message has been received.
struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_(struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
Create a new envelope.
Definition: mq.c:533
const struct GNUNET_MessageHeader * GNUNET_MQ_env_get_msg(const struct GNUNET_MQ_Envelope *env)
Obtain message contained in envelope.
Definition: mq.c:896
enum GNUNET_MQ_PriorityPreferences GNUNET_MQ_env_get_options(struct GNUNET_MQ_Envelope *env)
Get performance preferences set for this envelope.
Definition: mq.c:856
void GNUNET_MQ_impl_send_in_flight(struct GNUNET_MQ_Handle *mq)
Call the send notification for the current message, but do not try to send the next message until #gn...
Definition: mq.c:461
void * GNUNET_MQ_assoc_remove(struct GNUNET_MQ_Handle *mq, uint32_t request_id)
Remove the association for a request_id.
Definition: mq.c:639
const struct GNUNET_MessageHeader * GNUNET_MQ_impl_current(struct GNUNET_MQ_Handle *mq)
Get the message that should currently be sent.
Definition: mq.c:517
void GNUNET_MQ_set_handlers_closure(struct GNUNET_MQ_Handle *mq, void *handlers_cls)
Change the closure argument in all of the handlers of the mq.
Definition: mq.c:506
struct GNUNET_MQ_Envelope * GNUNET_MQ_get_last_envelope(struct GNUNET_MQ_Handle *mq)
Function to obtain the last envelope in the queue.
Definition: mq.c:837
void * GNUNET_MQ_impl_state(struct GNUNET_MQ_Handle *mq)
Get the implementation state associated with the message queue.
Definition: mq.c:526
struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_nested_mh_(struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, const struct GNUNET_MessageHeader *nested_mh)
Implementation of the GNUNET_MQ_msg_nested_mh macro.
Definition: mq.c:565
void GNUNET_MQ_dll_insert_tail(struct GNUNET_MQ_Envelope **env_head, struct GNUNET_MQ_Envelope **env_tail, struct GNUNET_MQ_Envelope *env)
Insert env into the envelope DLL starting at env_head Note that env must not be in any MQ while this ...
Definition: mq.c:952
void(* GNUNET_MQ_CancelImpl)(struct GNUNET_MQ_Handle *mq, void *impl_state)
Implementation function that cancels the currently sent message.
void(* GNUNET_MQ_SendImpl)(struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *msg, void *impl_state)
Signature of functions implementing the sending functionality of a message queue.
unsigned int GNUNET_MQ_count_handlers(const struct GNUNET_MQ_MessageHandler *handlers)
Count the handlers in a handler array.
Definition: mq.c:1017
struct GNUNET_MQ_Envelope * GNUNET_MQ_get_current_envelope(struct GNUNET_MQ_Handle *mq)
Function to obtain the current envelope from within GNUNET_MQ_SendImpl implementations.
Definition: mq.c:830
void GNUNET_MQ_destroy(struct GNUNET_MQ_Handle *mq)
Destroy the message queue.
Definition: mq.c:700
@ GNUNET_MQ_ERROR_MALFORMED
We received a message that was malformed and thus could not be passed to its handler.
@ GNUNET_MQ_PREFERENCE_RELIABILITY
The preferred transmission for this envelope foces on reliability.
@ GNUNET_MQ_PREFERENCE_NONE
No preference was expressed.
@ GNUNET_MQ_PREFERENCE_LATENCY
The preferred transmission for this envelope foces on minimizing latency.
@ GNUNET_MQ_PREFERENCE_BANDWIDTH
The preferred transmission for this envelope focuses on maximizing bandwidth.
@ GNUNET_MQ_PREF_OUT_OF_ORDER
Flag to indicate that out-of-order delivery is OK.
@ GNUNET_MQ_PRIORITY_MASK
Bit mask to apply to extract the priority bits.
@ GNUNET_MQ_PREF_CORK_ALLOWED
Flag to indicate that CORKing is acceptable.
@ GNUNET_MQ_PREF_UNRELIABLE
Flag to indicate that unreliable delivery is acceptable.
@ GNUNET_MQ_PREF_GOODPUT
Flag to indicate that high bandwidth is desired.
@ GNUNET_MQ_PREF_LOW_LATENCY
Flag to indicate that low latency is important.
#define GNUNET_MESSAGE_TYPE_REQUEST_AGPL
Message to request source code link.
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:981
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_now(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run as soon as possible.
Definition: scheduler.c:1305
void(* GNUNET_SCHEDULER_TaskCallback)(void *cls)
Signature of the main function of a task.
static void destroy(void *cls)
Function to destroy a microphone.
Definition: microphone.c:153
static void impl_send_continue(void *cls)
Task run to call the send implementation for the next queued message, if any.
Definition: mq.c:409
#define LOG(kind,...)
Definition: mq.c:30
static unsigned int size
Size of the "table".
Definition: peer.c:68
Internal representation of the hash map.
Handle we return for callbacks registered to be notified when GNUNET_MQ_destroy() is called on a queu...
Definition: mq.c:671
struct GNUNET_MQ_DestroyNotificationHandle * next
Kept in a DLL.
Definition: mq.c:680
struct GNUNET_MQ_DestroyNotificationHandle * prev
Kept in a DLL.
Definition: mq.c:675
GNUNET_SCHEDULER_TaskCallback cb
Function to call.
Definition: mq.c:690
void * cb_cls
Closure for cb.
Definition: mq.c:695
struct GNUNET_MQ_Handle * mq
Queue to notify about.
Definition: mq.c:685
struct GNUNET_MQ_Handle * parent_queue
Queue the message is queued in, NULL if message is not queued.
Definition: mq.c:57
void * sent_cls
Closure for send_cb.
Definition: mq.c:67
int have_custom_options
Did the application call GNUNET_MQ_env_set_options()?
Definition: mq.c:79
struct GNUNET_MessageHeader * mh
Actual allocated message header.
Definition: mq.c:52
struct GNUNET_MQ_Envelope * next
Messages are stored in a linked list.
Definition: mq.c:39
GNUNET_SCHEDULER_TaskCallback sent_cb
Called after the message was sent irrevocably.
Definition: mq.c:62
struct GNUNET_MQ_Envelope * prev
Messages are stored in a linked list Each queue has its own list of envelopes.
Definition: mq.c:45
enum GNUNET_MQ_PriorityPreferences priority
Flags that were set for this envelope by GNUNET_MQ_env_set_options().
Definition: mq.c:74
Handle to a message queue.
Definition: mq.c:87
GNUNET_MQ_DestroyImpl destroy_impl
Implementation-dependent queue destruction function.
Definition: mq.c:102
enum GNUNET_MQ_PriorityPreferences priority
Flags that were set for this queue by GNUNET_MQ_set_options().
Definition: mq.c:165
uint32_t assoc_id
Next id that should be used for the assoc_map, initialized lazily to a random value together with ass...
Definition: mq.c:172
struct GNUNET_MQ_DestroyNotificationHandle * dnh_head
Functions to call on queue destruction; kept in a DLL.
Definition: mq.c:154
struct GNUNET_MQ_Envelope * envelope_head
Linked list of messages pending to be sent.
Definition: mq.c:132
struct GNUNET_MQ_Envelope * current_envelope
Message that is currently scheduled to be sent.
Definition: mq.c:144
GNUNET_MQ_CancelImpl cancel_impl
Implementation-dependent send cancel function.
Definition: mq.c:107
GNUNET_MQ_ErrorHandler error_handler
Callback will be called when an error occurs.
Definition: mq.c:117
struct GNUNET_MQ_DestroyNotificationHandle * dnh_tail
Functions to call on queue destruction; kept in a DLL.
Definition: mq.c:159
unsigned int queue_length
Number of entries we have in the envelope-DLL.
Definition: mq.c:177
void * impl_state
Implementation-specific state.
Definition: mq.c:112
struct GNUNET_MQ_MessageHandler * handlers
Handlers array, or NULL if the queue should not receive messages.
Definition: mq.c:91
struct GNUNET_SCHEDULER_Task * send_task
Task to asynchronously run impl_send_continue().
Definition: mq.c:127
void * error_handler_cls
Closure for the error handler.
Definition: mq.c:122
GNUNET_MQ_SendImpl send_impl
Actual implementation of message sending, called when a message is added.
Definition: mq.c:97
struct GNUNET_MQ_Envelope * envelope_tail
Linked list of messages pending to be sent.
Definition: mq.c:137
struct GNUNET_CONTAINER_MultiHashMap32 * assoc_map
Map of associations, lazily allocated.
Definition: mq.c:149
bool in_flight
True if GNUNET_MQ_impl_send_in_flight() was called.
Definition: mq.c:182
Message handler for a specific message type.
Header for all communications.
Entry in list of pending tasks.
Definition: scheduler.c:136