GNUnet  0.10.x
mq.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2012-2019 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 
29 #define LOG(kind, ...) GNUNET_log_from(kind, "util-mq", __VA_ARGS__)
30 
31 
38 
44 
51 
56 
61 
65  void *sent_cls;
66 
73 
78 };
79 
80 
89 
95 
100 
105 
109  void *impl_state;
110 
115 
120 
125 
130 
135 
142 
147 
152 
157 
163 
169  uint32_t assoc_id;
170 
174  unsigned int queue_length;
175 
181 
186 };
187 
188 
199 void
201  const struct GNUNET_MessageHeader *mh)
202 {
203  int ret;
204 
205  ret = GNUNET_MQ_handle_message(mq->handlers, mh);
206  if (GNUNET_SYSERR == ret)
207  {
209  return;
210  }
211 }
212 
213 
226 int
228  const struct GNUNET_MessageHeader *mh)
229 {
230  const struct GNUNET_MQ_MessageHandler *handler;
231  int handled = GNUNET_NO;
232  uint16_t msize = ntohs(mh->size);
233  uint16_t mtype = ntohs(mh->type);
234 
236  "Received message of type %u and size %u\n",
237  mtype,
238  msize);
239 
240  if (NULL == handlers)
241  goto done;
242  for (handler = handlers; NULL != handler->cb; handler++)
243  {
244  if (handler->type == mtype)
245  {
246  handled = GNUNET_YES;
247  if ((handler->expected_size > msize) ||
248  ((handler->expected_size != msize) && (NULL == handler->mv)))
249  {
250  /* Too small, or not an exact size and
251  no 'mv' handler to check rest */
253  "Received malformed message of type %u\n",
254  (unsigned int)handler->type);
255  return GNUNET_SYSERR;
256  }
257  if ((NULL == handler->mv) ||
258  (GNUNET_OK == handler->mv(handler->cls, mh)))
259  {
260  /* message well-formed, pass to handler */
261  handler->cb(handler->cls, mh);
262  }
263  else
264  {
265  /* Message rejected by check routine */
267  "Received malformed message of type %u\n",
268  (unsigned int)handler->type);
269  return GNUNET_SYSERR;
270  }
271  break;
272  }
273  }
274 done:
275  if (GNUNET_NO == handled)
276  {
278  "No handler for message of type %u and size %u\n",
279  mtype,
280  msize);
281  return GNUNET_NO;
282  }
283  return GNUNET_OK;
284 }
285 
286 
297 void
299 {
300  if (NULL == mq->error_handler)
301  {
303  "Got error %d, but no handler installed\n",
304  (int)error);
305  return;
306  }
307  mq->error_handler(mq->error_handler_cls, error);
308 }
309 
310 
318 void
320 {
321  GNUNET_assert(NULL == ev->parent_queue);
322  GNUNET_free(ev);
323 }
324 
325 
332 unsigned int
334 {
335  if (GNUNET_YES != mq->in_flight)
336  {
337  return mq->queue_length;
338  }
339  return mq->queue_length - 1;
340 }
341 
342 
350 void
352 {
353  GNUNET_assert(NULL != mq);
354  GNUNET_assert(NULL == ev->parent_queue);
355 
356  mq->queue_length++;
357  if (mq->queue_length >= 10000)
358  {
359  /* This would seem like a bug... */
361  "MQ with %u entries extended by message of type %u (FC broken?)\n",
362  (unsigned int)mq->queue_length,
363  (unsigned int)ntohs(ev->mh->type));
364  }
365  ev->parent_queue = mq;
366  /* is the implementation busy? queue it! */
367  if ((NULL != mq->current_envelope) || (NULL != mq->send_task))
368  {
370  return;
371  }
372  GNUNET_assert(NULL == mq->envelope_head);
373  mq->current_envelope = ev;
374 
376  "sending message of type %u, queue empty (MQ: %p)\n",
377  ntohs(ev->mh->type),
378  mq);
379 
380  mq->send_impl(mq, ev->mh, mq->impl_state);
381 }
382 
383 
391 struct GNUNET_MQ_Envelope *
393 {
394  struct GNUNET_MQ_Envelope *env;
395 
396  env = mq->envelope_head;
398  mq->queue_length--;
399  env->parent_queue = NULL;
400  return env;
401 }
402 
403 
411 struct GNUNET_MQ_Envelope *
413 {
414  GNUNET_assert(NULL == env->next);
415  GNUNET_assert(NULL == env->parent_queue);
416  GNUNET_assert(NULL == env->sent_cb);
418  return GNUNET_MQ_msg_copy(env->mh);
419 }
420 
421 
429 void
431  const struct GNUNET_MQ_Envelope *ev)
432 {
433  struct GNUNET_MQ_Envelope *env;
434  uint16_t msize;
435 
436  msize = ntohs(ev->mh->size);
437  env = GNUNET_malloc(sizeof(struct GNUNET_MQ_Envelope) + msize);
438  env->mh = (struct GNUNET_MessageHeader *)&env[1];
439  env->sent_cb = ev->sent_cb;
440  env->sent_cls = ev->sent_cls;
441  GNUNET_memcpy(&env[1], ev->mh, msize);
442  GNUNET_MQ_send(mq, env);
443 }
444 
445 
453 static void
455 {
456  struct GNUNET_MQ_Handle *mq = cls;
457 
458  mq->send_task = NULL;
459  /* call is only valid if we're actually currently sending
460  * a message */
461  if (NULL == mq->envelope_head)
462  return;
465  mq->envelope_tail,
466  mq->current_envelope);
467 
469  "sending message of type %u from queue\n",
470  ntohs(mq->current_envelope->mh->type));
471 
472  mq->send_impl(mq, mq->current_envelope->mh, mq->impl_state);
473 }
474 
475 
483 void
485 {
486  struct GNUNET_MQ_Envelope *current_envelope;
488 
489  GNUNET_assert(0 < mq->queue_length);
490  mq->queue_length--;
491  mq->in_flight = GNUNET_NO;
492  current_envelope = mq->current_envelope;
493  current_envelope->parent_queue = NULL;
494  mq->current_envelope = NULL;
495  GNUNET_assert(NULL == mq->send_task);
497  if (NULL != (cb = current_envelope->sent_cb))
498  {
499  current_envelope->sent_cb = NULL;
500  cb(current_envelope->sent_cls);
501  }
502  GNUNET_free(current_envelope);
503 }
504 
505 
516 void
518 {
519  struct GNUNET_MQ_Envelope *current_envelope;
521 
522  mq->in_flight = GNUNET_YES;
523  /* call is only valid if we're actually currently sending
524  * a message */
525  current_envelope = mq->current_envelope;
526  GNUNET_assert(NULL != current_envelope);
527  /* can't call cancel from now on anymore */
528  current_envelope->parent_queue = NULL;
529  if (NULL != (cb = current_envelope->sent_cb))
530  {
531  current_envelope->sent_cb = NULL;
532  cb(current_envelope->sent_cls);
533  }
534 }
535 
536 
549 struct GNUNET_MQ_Handle *
552  GNUNET_MQ_CancelImpl cancel,
553  void *impl_state,
554  const struct GNUNET_MQ_MessageHandler *handlers,
556  void *error_handler_cls)
557 {
558  struct GNUNET_MQ_Handle *mq;
559 
560  mq = GNUNET_new(struct GNUNET_MQ_Handle);
561  mq->send_impl = send;
562  mq->destroy_impl = destroy;
563  mq->cancel_impl = cancel;
564  mq->handlers = GNUNET_MQ_copy_handlers(handlers);
567  mq->impl_state = impl_state;
568 
569  return mq;
570 }
571 
572 
580 void
582 {
583  if (NULL == mq->handlers)
584  return;
585  for (unsigned int i = 0; NULL != mq->handlers[i].cb; i++)
586  mq->handlers[i].cls = handlers_cls;
587 }
588 
589 
599 const struct GNUNET_MessageHeader *
601 {
602  GNUNET_assert(NULL != mq->current_envelope);
603  GNUNET_assert(NULL != mq->current_envelope->mh);
604  return mq->current_envelope->mh;
605 }
606 
607 
622 void *
624 {
625  return mq->impl_state;
626 }
627 
628 
629 struct GNUNET_MQ_Envelope *
630 GNUNET_MQ_msg_(struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
631 {
632  struct GNUNET_MQ_Envelope *ev;
633 
634  ev = GNUNET_malloc(size + sizeof(struct GNUNET_MQ_Envelope));
635  ev->mh = (struct GNUNET_MessageHeader *)&ev[1];
636  ev->mh->size = htons(size);
637  ev->mh->type = htons(type);
638  if (NULL != mhp)
639  *mhp = ev->mh;
640  return ev;
641 }
642 
643 
650 struct GNUNET_MQ_Envelope *
652 {
653  struct GNUNET_MQ_Envelope *mqm;
654  uint16_t size = ntohs(hdr->size);
655 
656  mqm = GNUNET_malloc(sizeof(*mqm) + size);
657  mqm->mh = (struct GNUNET_MessageHeader *)&mqm[1];
658  GNUNET_memcpy(mqm->mh, hdr, size);
659  return mqm;
660 }
661 
662 
672 struct GNUNET_MQ_Envelope *
674  uint16_t base_size,
675  uint16_t type,
676  const struct GNUNET_MessageHeader *nested_mh)
677 {
678  struct GNUNET_MQ_Envelope *mqm;
679  uint16_t size;
680 
681  if (NULL == nested_mh)
682  return GNUNET_MQ_msg_(mhp, base_size, type);
683 
684  size = base_size + ntohs(nested_mh->size);
685 
686  /* check for uint16_t overflow */
687  if (size < base_size)
688  return NULL;
689 
690  mqm = GNUNET_MQ_msg_(mhp, size, type);
691  GNUNET_memcpy((char *)mqm->mh + base_size,
692  nested_mh,
693  ntohs(nested_mh->size));
694 
695  return mqm;
696 }
697 
698 
705 uint32_t
706 GNUNET_MQ_assoc_add(struct GNUNET_MQ_Handle *mq, void *assoc_data)
707 {
708  uint32_t id;
709 
710  if (NULL == mq->assoc_map)
711  {
713  mq->assoc_id = 1;
714  }
715  id = mq->assoc_id++;
718  mq->assoc_map,
719  id,
720  assoc_data,
722  return id;
723 }
724 
725 
733 void *
734 GNUNET_MQ_assoc_get(struct GNUNET_MQ_Handle *mq, uint32_t request_id)
735 {
736  if (NULL == mq->assoc_map)
737  return NULL;
738  return GNUNET_CONTAINER_multihashmap32_get(mq->assoc_map, request_id);
739 }
740 
741 
749 void *
750 GNUNET_MQ_assoc_remove(struct GNUNET_MQ_Handle *mq, uint32_t request_id)
751 {
752  void *val;
753 
754  if (NULL == mq->assoc_map)
755  return NULL;
756  val = GNUNET_CONTAINER_multihashmap32_get(mq->assoc_map, request_id);
758  return val;
759 }
760 
761 
771 void
774  void *cb_cls)
775 {
776  /* allow setting *OR* clearing callback */
777  GNUNET_assert((NULL == ev->sent_cb) || (NULL == cb));
778  ev->sent_cb = cb;
779  ev->sent_cls = cb_cls;
780 }
781 
782 
792 
797 
802 
807 
811  void *cb_cls;
812 };
813 
814 
820 void
822 {
824 
825  if (NULL != mq->destroy_impl)
826  {
827  mq->destroy_impl(mq, mq->impl_state);
828  }
829  if (NULL != mq->send_task)
830  {
832  mq->send_task = NULL;
833  }
834  while (NULL != mq->envelope_head)
835  {
836  struct GNUNET_MQ_Envelope *ev;
837 
838  ev = mq->envelope_head;
839  ev->parent_queue = NULL;
841  GNUNET_assert(0 < mq->queue_length);
842  mq->queue_length--;
844  "MQ destroy drops message of type %u\n",
845  ntohs(ev->mh->type));
846  GNUNET_MQ_discard(ev);
847  }
848  if (NULL != mq->current_envelope)
849  {
850  /* we can only discard envelopes that
851  * are not queued! */
852  mq->current_envelope->parent_queue = NULL;
854  "MQ destroy drops current message of type %u\n",
855  ntohs(mq->current_envelope->mh->type));
857  mq->current_envelope = NULL;
858  GNUNET_assert(0 < mq->queue_length);
859  mq->queue_length--;
860  }
861  GNUNET_assert(0 == mq->queue_length);
862  while (NULL != (dnh = mq->dnh_head))
863  {
864  dnh->cb(dnh->cb_cls);
866  }
867  if (NULL != mq->assoc_map)
868  {
870  mq->assoc_map = NULL;
871  }
873  GNUNET_free(mq);
874 }
875 
876 
877 const struct GNUNET_MessageHeader *
879  uint16_t base_size)
880 {
881  uint16_t whole_size;
882  uint16_t nested_size;
883  const struct GNUNET_MessageHeader *nested_msg;
884 
885  whole_size = ntohs(mh->size);
886  GNUNET_assert(whole_size >= base_size);
887  nested_size = whole_size - base_size;
888  if (0 == nested_size)
889  return NULL;
890  if (nested_size < sizeof(struct GNUNET_MessageHeader))
891  {
892  GNUNET_break_op(0);
893  return NULL;
894  }
895  nested_msg = (const struct GNUNET_MessageHeader *)((char *)mh + base_size);
896  if (ntohs(nested_msg->size) != nested_size)
897  {
898  GNUNET_break_op(0);
899  return NULL;
900  }
901  return nested_msg;
902 }
903 
904 
912 void
914 {
915  struct GNUNET_MQ_Handle *mq = ev->parent_queue;
916 
917  GNUNET_assert(NULL != mq);
918  GNUNET_assert(NULL != mq->cancel_impl);
919 
921 
922  if (mq->current_envelope == ev)
923  {
924  /* complex case, we already started with transmitting
925  the message using the callbacks. */
927  GNUNET_assert(0 < mq->queue_length);
928  mq->queue_length--;
929  mq->cancel_impl(mq, mq->impl_state);
930  /* continue sending the next message, if any */
932  if (NULL != mq->current_envelope)
933  {
935  mq->envelope_tail,
936  mq->current_envelope);
937 
939  "sending canceled message of type %u queue\n",
940  ntohs(ev->mh->type));
941 
942  mq->send_impl(mq, mq->current_envelope->mh, mq->impl_state);
943  }
944  }
945  else
946  {
947  /* simple case, message is still waiting in the queue */
949  GNUNET_assert(0 < mq->queue_length);
950  mq->queue_length--;
951  }
952 
953  if (GNUNET_YES != mq->evacuate_called)
954  {
955  ev->parent_queue = NULL;
956  ev->mh = NULL;
957  /* also frees ev */
958  GNUNET_free(ev);
959  }
960 }
961 
962 
970 struct GNUNET_MQ_Envelope *
972 {
973  return mq->current_envelope;
974 }
975 
976 
983 struct GNUNET_MQ_Envelope *
985 {
986  if (NULL != mq->envelope_tail)
987  return mq->envelope_tail;
988 
989  return mq->current_envelope;
990 }
991 
992 
1001 void
1004 {
1005  env->priority = pp;
1007 }
1008 
1009 
1018 {
1019  struct GNUNET_MQ_Handle *mq = env->parent_queue;
1020 
1021  if (GNUNET_YES == env->have_custom_options)
1022  return env->priority;
1023  if (NULL == mq)
1024  return 0;
1025  return mq->priority;
1026 }
1027 
1028 
1040 {
1042 
1043  ret = GNUNET_MAX(p1 & GNUNET_MQ_PRIORITY_MASK, p2 & GNUNET_MQ_PRIORITY_MASK);
1044  ret |= ((p1 & GNUNET_MQ_PREF_UNRELIABLE) & (p2 & GNUNET_MQ_PREF_UNRELIABLE));
1045  ret |=
1047  ret |=
1049  ret |= ((p1 & GNUNET_MQ_PREF_GOODPUT) & (p2 & GNUNET_MQ_PREF_GOODPUT));
1050  ret |=
1052  return ret;
1053 }
1054 
1055 
1062 void
1065 {
1066  mq->priority = pp;
1067 }
1068 
1069 
1076 const struct GNUNET_MessageHeader *
1078 {
1079  return env->mh;
1080 }
1081 
1082 
1089 const struct GNUNET_MQ_Envelope *
1091 {
1092  return env->next;
1093 }
1094 
1095 
1108  void *cb_cls)
1109 {
1111 
1113  dnh->mq = mq;
1114  dnh->cb = cb;
1115  dnh->cb_cls = cb_cls;
1117  return dnh;
1118 }
1119 
1120 
1126 void
1129 {
1130  struct GNUNET_MQ_Handle *mq = dnh->mq;
1131 
1133  GNUNET_free(dnh);
1134 }
1135 
1136 
1149 void
1151  struct GNUNET_MQ_Envelope **env_tail,
1152  struct GNUNET_MQ_Envelope *env)
1153 {
1154  GNUNET_CONTAINER_DLL_insert(*env_head, *env_tail, env);
1155 }
1156 
1157 
1170 void
1172  struct GNUNET_MQ_Envelope **env_tail,
1173  struct GNUNET_MQ_Envelope *env)
1174 {
1175  GNUNET_CONTAINER_DLL_insert_tail(*env_head, *env_tail, env);
1176 }
1177 
1178 
1191 void
1193  struct GNUNET_MQ_Envelope **env_tail,
1194  struct GNUNET_MQ_Envelope *env)
1195 {
1196  GNUNET_CONTAINER_DLL_remove(*env_head, *env_tail, env);
1197 }
1198 
1199 
1210 struct GNUNET_MQ_MessageHandler *
1212 {
1213  struct GNUNET_MQ_MessageHandler *copy;
1214  unsigned int count;
1215 
1216  if (NULL == handlers)
1217  return NULL;
1218 
1219  count = GNUNET_MQ_count_handlers(handlers);
1220  copy = GNUNET_new_array(count + 1, struct GNUNET_MQ_MessageHandler);
1221  GNUNET_memcpy(copy,
1222  handlers,
1223  count * sizeof(struct GNUNET_MQ_MessageHandler));
1224  return copy;
1225 }
1226 
1227 
1240 struct GNUNET_MQ_MessageHandler *
1242  GNUNET_MQ_MessageCallback agpl_handler,
1243  void *agpl_cls)
1244 {
1245  struct GNUNET_MQ_MessageHandler *copy;
1246  unsigned int count;
1247 
1248  if (NULL == handlers)
1249  return NULL;
1250  count = GNUNET_MQ_count_handlers(handlers);
1251  copy = GNUNET_new_array(count + 2, struct GNUNET_MQ_MessageHandler);
1252  GNUNET_memcpy(copy,
1253  handlers,
1254  count * sizeof(struct GNUNET_MQ_MessageHandler));
1255  copy[count].mv = NULL;
1256  copy[count].cb = agpl_handler;
1257  copy[count].cls = agpl_cls;
1258  copy[count].type = GNUNET_MESSAGE_TYPE_REQUEST_AGPL;
1259  copy[count].expected_size = sizeof(struct GNUNET_MessageHeader);
1260  return copy;
1261 }
1262 
1263 
1270 unsigned int
1272 {
1273  unsigned int i;
1274 
1275  if (NULL == handlers)
1276  return 0;
1277 
1278  for (i = 0; NULL != handlers[i].cb; i++)
1279  ;
1280 
1281  return i;
1282 }
1283 
1284 
1291 const char *
1293 {
1294  switch (type)
1295  {
1297  return "NONE";
1298 
1300  return "BANDWIDTH";
1301 
1303  return "LATENCY";
1304 
1306  return "RELIABILITY";
1307  }
1308  ;
1309  return NULL;
1310 }
1311 
1312 
1313 /* end of mq.c */
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
void GNUNET_MQ_dll_insert_tail(struct GNUNET_MQ_Envelope **env_head, struct GNUNET_MQ_Envelope **env_tail, struct GNUNET_MQ_Envelope *env)
Insert env into the envelope DLL starting at env_head Note that env must not be in any MQ while this ...
Definition: mq.c:1171
struct GNUNET_MQ_MessageHandler * handlers
Handlers array, or NULL if the queue should not receive messages.
Definition: mq.c:88
enum GNUNET_MQ_PriorityPreferences GNUNET_MQ_env_combine_options(enum GNUNET_MQ_PriorityPreferences p1, enum GNUNET_MQ_PriorityPreferences p2)
Combine performance preferences set for different envelopes that are being combined into one larger e...
Definition: mq.c:1038
struct GNUNET_MQ_Envelope * GNUNET_MQ_get_last_envelope(struct GNUNET_MQ_Handle *mq)
Function to obtain the last envelope in the queue.
Definition: mq.c:984
void GNUNET_MQ_dll_insert_head(struct GNUNET_MQ_Envelope **env_head, struct GNUNET_MQ_Envelope **env_tail, struct GNUNET_MQ_Envelope *env)
Insert env into the envelope DLL starting at env_head Note that env must not be in any MQ while this ...
Definition: mq.c:1150
Handle we return for callbacks registered to be notified when GNUNET_MQ_destroy() is called on a queu...
Definition: mq.c:787
struct GNUNET_MessageHeader * mh
Actual allocated message header.
Definition: mq.c:50
static void done()
struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_copy(const struct GNUNET_MessageHeader *hdr)
Create a new envelope by copying an existing message.
Definition: mq.c:651
static void error_handler(void *cls, enum GNUNET_MQ_Error error)
We encountered an error handling the MQ to the ATS service.
void * impl_state
Implementation-specific state.
Definition: mq.c:109
unsigned int queue_length
Number of entries we have in the envelope-DLL.
Definition: mq.c:174
struct GNUNET_MQ_DestroyNotificationHandle * GNUNET_MQ_destroy_notify(struct GNUNET_MQ_Handle *mq, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Register function to be called whenever mq is being destroyed.
Definition: mq.c:1106
const struct GNUNET_MessageHeader * GNUNET_MQ_env_get_msg(const struct GNUNET_MQ_Envelope *env)
Obtain message contained in envelope.
Definition: mq.c:1077
void GNUNET_MQ_inject_error(struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_Error error)
Call the error handler of a message queue with the given error code.
Definition: mq.c:298
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
struct GNUNET_MQ_DestroyNotificationHandle * dnh_tail
Functions to call on queue destruction; kept in a DLL.
Definition: mq.c:156
uint16_t expected_size
Expected size of messages of this type.
uint32_t GNUNET_MQ_assoc_add(struct GNUNET_MQ_Handle *mq, void *assoc_data)
Associate the assoc_data in mq with a unique request id.
Definition: mq.c:706
void(* GNUNET_MQ_ErrorHandler)(void *cls, enum GNUNET_MQ_Error error)
Generic error handler, called with the appropriate error code and the same closure specified at the c...
GNUNET_MQ_Error
Error codes for the queue.
int have_custom_options
Did the application call GNUNET_MQ_env_set_options()?
Definition: mq.c:77
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
struct GNUNET_MQ_MessageHandler * GNUNET_MQ_copy_handlers(const struct GNUNET_MQ_MessageHandler *handlers)
Copy an array of handlers.
Definition: mq.c:1211
int evacuate_called
GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
Definition: mq.c:180
#define GNUNET_MESSAGE_TYPE_REQUEST_AGPL
Message to request source code link.
void GNUNET_MQ_env_set_options(struct GNUNET_MQ_Envelope *env, enum GNUNET_MQ_PriorityPreferences pp)
Set application-specific preferences for this envelope.
Definition: mq.c:1002
No preference was expressed.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
#define GNUNET_NO
Definition: gnunet_common.h:78
int GNUNET_MQ_handle_message(const struct GNUNET_MQ_MessageHandler *handlers, const struct GNUNET_MessageHeader *mh)
Call the message message handler that was registered for the type of the given message in the given h...
Definition: mq.c:227
static struct GNUNET_IDENTITY_Handle * id
Handle to identity service.
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:75
GNUNET_MQ_PriorityPreferences
Per envelope preferences and priorities.
#define GNUNET_free_non_null(ptr)
Free the memory pointed to by ptr if ptr is not NULL.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
static int ret
Final status code.
Definition: gnunet-arm.c:89
const struct GNUNET_MessageHeader * GNUNET_MQ_impl_current(struct GNUNET_MQ_Handle *mq)
Get the message that should currently be sent.
Definition: mq.c:600
int GNUNET_CONTAINER_multihashmap32_put(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
const struct GNUNET_MQ_Envelope * GNUNET_MQ_env_next(const struct GNUNET_MQ_Envelope *env)
Return next envelope in queue.
Definition: mq.c:1090
int GNUNET_CONTAINER_multihashmap32_remove_all(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key)
Remove all entries for the given key from the map.
struct GNUNET_MQ_Envelope * envelope_tail
Linked list of messages pending to be sent.
Definition: mq.c:134
void(* GNUNET_SCHEDULER_TaskCallback)(void *cls)
Signature of the main function of a task.
GNUNET_MQ_SendImpl send_impl
Actual implementation of message sending, called when a message is added.
Definition: mq.c:94
Flag to indicate that low latency is important.
enum GNUNET_MQ_PriorityPreferences GNUNET_MQ_env_get_options(struct GNUNET_MQ_Envelope *env)
Get application-specific options for this envelope.
Definition: mq.c:1017
void GNUNET_MQ_set_handlers_closure(struct GNUNET_MQ_Handle *mq, void *handlers_cls)
Change the closure argument in all of the handlers of the mq.
Definition: mq.c:581
void GNUNET_MQ_impl_send_in_flight(struct GNUNET_MQ_Handle *mq)
Call the send notification for the current message, but do not try to send the next message until GNU...
Definition: mq.c:517
int in_flight
GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
Definition: mq.c:185
uint16_t type
Type of the message this handler covers, in host byte order.
Flag to indicate that high bandwidth is desired.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
GNUNET_MQ_ErrorHandler error_handler
Callback will be called when an error occurs.
Definition: mq.c:114
GNUNET_MQ_PreferenceKind
Enum defining all known preference categories.
struct GNUNET_MQ_Handle * GNUNET_MQ_queue_for_callbacks(GNUNET_MQ_SendImpl send, GNUNET_MQ_DestroyImpl destroy, GNUNET_MQ_CancelImpl cancel, void *impl_state, const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_ErrorHandler error_handler, void *error_handler_cls)
Create a message queue for the specified handlers.
Definition: mq.c:550
void * cls
Closure for mv and cb.
static struct GNUNET_CADET_MessageHandler handlers[]
Handlers, for diverse services.
GNUNET_SCHEDULER_TaskCallback sent_cb
Called after the message was sent irrevocably.
Definition: mq.c:60
void GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Call a callback once the envelope has been sent, that is, sending it can not be canceled anymore...
Definition: mq.c:772
GNUNET_MQ_MessageValidationCallback mv
Callback to validate a message of the specified type.
struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_nested_mh_(struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, const struct GNUNET_MessageHeader *nested_mh)
Implementation of the GNUNET_MQ_msg_nested_mh macro.
Definition: mq.c:673
void * GNUNET_CONTAINER_multihashmap32_get(const struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key)
Given a key find a value in the map matching the key.
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
unsigned int GNUNET_MQ_get_length(struct GNUNET_MQ_Handle *mq)
Obtain the current length of the message queue.
Definition: mq.c:333
GNUNET_SCHEDULER_TaskCallback cb
Function to call.
Definition: mq.c:806
#define GNUNET_MAX(a, b)
Definition: gnunet_common.h:82
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_now(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run as soon as possible.
Definition: scheduler.c:1264
GNUNET_MQ_DestroyImpl destroy_impl
Implementation-dependent queue destruction function.
Definition: mq.c:99
#define GNUNET_new_array(n, type)
Allocate a size n array with structs or unions of the given type.
The preferred transmission for this envelope focuses on maximizing bandwidth.
void * GNUNET_MQ_impl_state(struct GNUNET_MQ_Handle *mq)
Get the implementation state associated with the message queue.
Definition: mq.c:623
void(* GNUNET_MQ_DestroyImpl)(struct GNUNET_MQ_Handle *mq, void *impl_state)
Signature of functions implementing the destruction of a message queue.
GNUNET_MQ_MessageCallback cb
Callback, called every time a new message of the specified type has been receied. ...
void GNUNET_MQ_inject_message(struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *mh)
Call the message message handler that was registered for the type of the given message in the given m...
Definition: mq.c:200
We received a message that was malformed and thus could not be passed to its handler.
enum GNUNET_MQ_PriorityPreferences priority
Flags that were set for this queue by GNUNET_MQ_set_options().
Definition: mq.c:162
struct GNUNET_MQ_MessageHandler * GNUNET_MQ_copy_handlers2(const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_MessageCallback agpl_handler, void *agpl_cls)
Copy an array of handlers, appending AGPL handler.
Definition: mq.c:1241
Internal representation of the hash map.
struct GNUNET_SCHEDULER_Task * send_task
Task to asynchronously run impl_send_continue().
Definition: mq.c:124
struct GNUNET_MQ_Envelope * GNUNET_MQ_msg_(struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
Create a new envelope.
Definition: mq.c:630
struct GNUNET_MQ_DestroyNotificationHandle * dnh_head
Functions to call on queue destruction; kept in a DLL.
Definition: mq.c:151
Message handler for a specific message type.
struct GNUNET_CONTAINER_MultiHashMap32 * GNUNET_CONTAINER_multihashmap32_create(unsigned int len)
Create a 32-bit key multi hash map.
struct GNUNET_MQ_Envelope * prev
Messages are stored in a linked list Each queue has its own list of envelopes.
Definition: mq.c:43
void GNUNET_MQ_dll_remove(struct GNUNET_MQ_Envelope **env_head, struct GNUNET_MQ_Envelope **env_tail, struct GNUNET_MQ_Envelope *env)
Remove env from the envelope DLL starting at env_head.
Definition: mq.c:1192
unsigned int GNUNET_MQ_count_handlers(const struct GNUNET_MQ_MessageHandler *handlers)
Count the handlers in a handler array.
Definition: mq.c:1271
void GNUNET_CONTAINER_multihashmap32_destroy(struct GNUNET_CONTAINER_MultiHashMap32 *map)
Destroy a 32-bit key hash map.
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *ev)
Discard the message queue message, free all allocated resources.
Definition: mq.c:319
void * sent_cls
Closure for send_cb.
Definition: mq.c:65
There must only be one value per key; storing a value should fail if a value under the same key alrea...
GNUNET_MQ_CancelImpl cancel_impl
Implementation-dependent send cancel function.
Definition: mq.c:104
#define GNUNET_SYSERR
Definition: gnunet_common.h:76
static unsigned int size
Size of the "table".
Definition: peer.c:66
struct GNUNET_MQ_Handle * mq
Queue to notify about.
Definition: mq.c:801
#define LOG(kind,...)
Definition: mq.c:29
struct GNUNET_MQ_Envelope * next
Messages are stored in a linked list.
Definition: mq.c:37
void * error_handler_cls
Closure for the error handler.
Definition: mq.c:119
uint32_t assoc_id
Next id that should be used for the assoc_map, initialized lazily to a random value together with ass...
Definition: mq.c:169
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
#define GNUNET_CONTAINER_DLL_insert_tail(head, tail, element)
Insert an element at the tail of a DLL.
void(* GNUNET_MQ_CancelImpl)(struct GNUNET_MQ_Handle *mq, void *impl_state)
Implementation function that cancels the currently sent message.
static void impl_send_continue(void *cls)
Task run to call the send implementation for the next queued message, if any.
Definition: mq.c:454
Bit mask to apply to extract the priority bits.
void(* GNUNET_MQ_MessageCallback)(void *cls, const struct GNUNET_MessageHeader *msg)
Called when a message has been received.
Handle to a message queue.
Definition: mq.c:84
struct GNUNET_MQ_Handle * parent_queue
Queue the message is queued in, NULL if message is not queued.
Definition: mq.c:55
void GNUNET_MQ_set_options(struct GNUNET_MQ_Handle *mq, enum GNUNET_MQ_PriorityPreferences pp)
Set application-specific default options for this queue.
Definition: mq.c:1063
struct GNUNET_CONTAINER_MultiHashMap32 * assoc_map
Map of associations, lazily allocated.
Definition: mq.c:146
const char * GNUNET_MQ_preference_to_string(enum GNUNET_MQ_PreferenceKind type)
Convert an enum GNUNET_MQ_PreferenceType to a string.
Definition: mq.c:1292
enum GNUNET_MQ_PriorityPreferences priority
Flags that were set for this envelope by GNUNET_MQ_env_set_options().
Definition: mq.c:72
void * GNUNET_MQ_assoc_remove(struct GNUNET_MQ_Handle *mq, uint32_t request_id)
Remove the association for a request_id.
Definition: mq.c:750
struct GNUNET_MQ_DestroyNotificationHandle * next
Kept in a DLL.
Definition: mq.c:796
struct GNUNET_MQ_Envelope * current_envelope
Message that is currently scheduled to be sent.
Definition: mq.c:141
void(* GNUNET_MQ_SendImpl)(struct GNUNET_MQ_Handle *mq, const struct GNUNET_MessageHeader *msg, void *impl_state)
Signature of functions implementing the sending functionality of a message queue. ...
void GNUNET_MQ_send_copy(struct GNUNET_MQ_Handle *mq, const struct GNUNET_MQ_Envelope *ev)
Send a copy of a message with the given message queue.
Definition: mq.c:430
The preferred transmission for this envelope foces on minimizing latency.
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
#define GNUNET_log(kind,...)
Entry in list of pending tasks.
Definition: scheduler.c:131
Flag to indicate that out-of-order delivery is OK.
void GNUNET_MQ_destroy_notify_cancel(struct GNUNET_MQ_DestroyNotificationHandle *dnh)
Cancel registration from GNUNET_MQ_destroy_notify().
Definition: mq.c:1127
const struct GNUNET_MessageHeader * GNUNET_MQ_extract_nested_mh_(const struct GNUNET_MessageHeader *mh, uint16_t base_size)
Implementation of the #GNUNET_MQ_extract_nexted_mh macro.
Definition: mq.c:878
Flag to indicate that CORKing is acceptable.
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
Header for all communications.
struct GNUNET_MQ_Envelope * GNUNET_MQ_get_current_envelope(struct GNUNET_MQ_Handle *mq)
Function to obtain the current envelope from within GNUNET_MQ_SendImpl implementations.
Definition: mq.c:971
void GNUNET_MQ_destroy(struct GNUNET_MQ_Handle *mq)
Destroy the message queue.
Definition: mq.c:821
#define GNUNET_YES
Definition: gnunet_common.h:77
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:351
Flag to indicate that unreliable delivery is acceptable.
struct GNUNET_MQ_Envelope * GNUNET_MQ_unsent_head(struct GNUNET_MQ_Handle *mq)
Remove the first envelope that has not yet been sent from the message queue and return it...
Definition: mq.c:392
static void destroy(void *cls)
struct GNUNET_MQ_Envelope * GNUNET_MQ_env_copy(struct GNUNET_MQ_Envelope *env)
Function to copy an envelope.
Definition: mq.c:412
void GNUNET_MQ_impl_send_continue(struct GNUNET_MQ_Handle *mq)
Call the send implementation for the next queued message, if any.
Definition: mq.c:484
void GNUNET_MQ_send_cancel(struct GNUNET_MQ_Envelope *ev)
Cancel sending the message.
Definition: mq.c:913
struct GNUNET_MQ_DestroyNotificationHandle * prev
Kept in a DLL.
Definition: mq.c:791
struct GNUNET_MQ_Envelope * envelope_head
Linked list of messages pending to be sent.
Definition: mq.c:129
void * cb_cls
Closure for cb.
Definition: mq.c:811
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_free(ptr)
Wrapper around free.
void * GNUNET_MQ_assoc_get(struct GNUNET_MQ_Handle *mq, uint32_t request_id)
Get the data associated with a request_id in a queue.
Definition: mq.c:734
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:956
The preferred transmission for this envelope foces on reliability.