GNUnet  0.19.4
gnunet-service-set.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2013-2017 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
26 #include "platform.h"
27 #include "gnunet-service-set.h"
32 
37 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
38 
39 
44 {
49 
54 
58  struct Set *source_set;
59 
63  uint32_t cookie;
64 };
65 
66 
71 struct Listener
72 {
76  struct Listener *next;
77 
81  struct Listener *prev;
82 
88  struct Operation *op_head;
89 
95  struct Operation *op_tail;
96 
101  struct ClientState *cs;
102 
107 
112  struct GNUNET_HashCode app_id;
113 
118 };
119 
120 
125 static struct GNUNET_CADET_Handle *cadet;
126 
131 
136 
140 static uint32_t lazy_copy_cookie;
141 
146 
150 static struct Listener *listener_head;
151 
155 static struct Listener *listener_tail;
156 
160 static unsigned int num_clients;
161 
166 static int in_shutdown;
167 
174 static uint32_t suggest_id;
175 
176 
184 static struct Operation *
185 get_incoming (uint32_t id)
186 {
187  for (struct Listener *listener = listener_head; NULL != listener;
189  {
190  for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
191  if (op->suggest_id == id)
192  return op;
193  }
194  return NULL;
195 }
196 
197 
203 static void
205 {
206  struct Listener *listener;
207 
209  "Destroying incoming operation %p\n",
210  op);
211  if (NULL != (listener = op->listener))
212  {
213  GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op);
214  op->listener = NULL;
215  }
216  if (NULL != op->timeout_task)
217  {
218  GNUNET_SCHEDULER_cancel (op->timeout_task);
219  op->timeout_task = NULL;
220  }
222 }
223 
224 
229 {
234 
238  unsigned int min_op_generation;
239 
243  unsigned int max_op_generation;
244 };
245 
246 
256 static int
257 garbage_collect_cb (void *cls, const struct GNUNET_HashCode *key, void *value)
258 {
259  // struct GarbageContext *gc = cls;
260  // struct ElementEntry *ee = value;
261 
262  // if (GNUNET_YES != ee->removed)
263  // return GNUNET_OK;
264  // if ( (gc->max_op_generation < ee->generation_added) ||
265  // (ee->generation_removed > gc->min_op_generation) )
266  // {
267  // GNUNET_assert (GNUNET_YES ==
268  // GNUNET_CONTAINER_multihashmap_remove (gc->map,
269  // key,
270  // ee));
271  // GNUNET_free (ee);
272  // }
273  return GNUNET_OK;
274 }
275 
276 
284 static void
286 {
287  struct GarbageContext gc;
288 
289  gc.min_op_generation = UINT_MAX;
290  gc.max_op_generation = 0;
291  for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
292  {
293  gc.min_op_generation =
294  GNUNET_MIN (gc.min_op_generation, op->generation_created);
295  gc.max_op_generation =
296  GNUNET_MAX (gc.max_op_generation, op->generation_created);
297  }
298  gc.map = set->content->elements;
301  &gc);
302 }
303 
304 
313 static int
314 is_excluded_generation (unsigned int generation,
315  struct GenerationRange *excluded,
316  unsigned int excluded_size)
317 {
318  for (unsigned int i = 0; i < excluded_size; i++)
319  if ((generation >= excluded[i].start) && (generation < excluded[i].end))
320  return GNUNET_YES;
321  return GNUNET_NO;
322 }
323 
324 
334 static int
336  unsigned int query_generation,
337  struct GenerationRange *excluded,
338  unsigned int excluded_size)
339 {
340  struct MutationEvent *mut;
341  int is_present;
342 
343  GNUNET_assert (NULL != ee->mutations);
344  if (GNUNET_YES ==
345  is_excluded_generation (query_generation, excluded, excluded_size))
346  {
347  GNUNET_break (0);
348  return GNUNET_NO;
349  }
350 
351  is_present = GNUNET_NO;
352 
353  /* Could be made faster with binary search, but lists
354  are small, so why bother. */
355  for (unsigned int i = 0; i < ee->mutations_size; i++)
356  {
357  mut = &ee->mutations[i];
358 
359  if (mut->generation > query_generation)
360  {
361  /* The mutation doesn't apply to our generation
362  anymore. We can'b break here, since mutations aren't
363  sorted by generation. */
364  continue;
365  }
366 
367  if (GNUNET_YES ==
368  is_excluded_generation (mut->generation, excluded, excluded_size))
369  {
370  /* The generation is excluded (because it belongs to another
371  fork via a lazy copy) and thus mutations aren't considered
372  for membership testing. */
373  continue;
374  }
375 
376  /* This would be an inconsistency in how we manage mutations. */
377  if ((GNUNET_YES == is_present) && (GNUNET_YES == mut->added))
378  GNUNET_assert (0);
379  /* Likewise. */
380  if ((GNUNET_NO == is_present) && (GNUNET_NO == mut->added))
381  GNUNET_assert (0);
382 
383  is_present = mut->added;
384  }
385 
386  return is_present;
387 }
388 
389 
397 int
399 {
400  return is_element_of_generation (ee,
401  op->generation_created,
402  op->set->excluded_generations,
403  op->set->excluded_generations_size);
404 }
405 
406 
420 void
422 {
423  struct Set *set = op->set;
424  struct GNUNET_CADET_Channel *channel;
425 
426  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op);
427  GNUNET_assert (NULL == op->listener);
428  if (NULL != op->state)
429  {
430  set->vt->cancel (op);
431  op->state = NULL;
432  }
433  if (NULL != set)
434  {
436  op->set = NULL;
437  }
438  if (NULL != op->context_msg)
439  {
440  GNUNET_free (op->context_msg);
441  op->context_msg = NULL;
442  }
443  if (NULL != (channel = op->channel))
444  {
445  /* This will free op; called conditionally as this helper function
446  is also called from within the channel disconnect handler. */
447  op->channel = NULL;
449  }
450  if ((NULL != set) && (GNUNET_YES == gc))
452  /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
453  * there was a channel end handler that will free 'op' on the call stack. */
454 }
455 
456 
465 static void *
466 client_connect_cb (void *cls,
467  struct GNUNET_SERVICE_Client *c,
468  struct GNUNET_MQ_Handle *mq)
469 {
470  struct ClientState *cs;
471 
472  num_clients++;
473  cs = GNUNET_new (struct ClientState);
474  cs->client = c;
475  cs->mq = mq;
476  return cs;
477 }
478 
479 
488 static int
490  const struct GNUNET_HashCode *key,
491  void *value)
492 {
493  struct ElementEntry *ee = value;
494 
495  GNUNET_free (ee->mutations);
496  GNUNET_free (ee);
497  return GNUNET_YES;
498 }
499 
500 
508 static void
510  struct GNUNET_SERVICE_Client *client,
511  void *internal_cls)
512 {
513  struct ClientState *cs = internal_cls;
514  struct Operation *op;
515  struct Listener *listener;
516  struct Set *set;
517 
518  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n");
519  if (NULL != (set = cs->set))
520  {
521  struct SetContent *content = set->content;
522  struct PendingMutation *pm;
523  struct PendingMutation *pm_current;
524  struct LazyCopyRequest *lcr;
525 
526  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n");
527  /* Destroy pending set operations */
528  while (NULL != set->ops_head)
530 
531  /* Destroy operation-specific state */
532  GNUNET_assert (NULL != set->state);
533  set->vt->destroy_set (set->state);
534  set->state = NULL;
535 
536  /* Clean up ongoing iterations */
537  if (NULL != set->iter)
538  {
540  set->iter = NULL;
541  set->iteration_id++;
542  }
543 
544  /* discard any pending mutations that reference this set */
545  pm = content->pending_mutations_head;
546  while (NULL != pm)
547  {
548  pm_current = pm;
549  pm = pm->next;
550  if (pm_current->set == set)
551  {
553  content->pending_mutations_tail,
554  pm_current);
555  GNUNET_free (pm_current);
556  }
557  }
558 
559  /* free set content (or at least decrement RC) */
560  set->content = NULL;
561  GNUNET_assert (0 != content->refcount);
562  content->refcount--;
563  if (0 == content->refcount)
564  {
565  GNUNET_assert (NULL != content->elements);
568  NULL);
570  content->elements = NULL;
571  GNUNET_free (content);
572  }
574  set->excluded_generations = NULL;
575 
576  /* remove set from pending copy requests */
577  lcr = lazy_copy_head;
578  while (NULL != lcr)
579  {
580  struct LazyCopyRequest *lcr_current = lcr;
581 
582  lcr = lcr->next;
583  if (lcr_current->source_set == set)
584  {
587  lcr_current);
588  GNUNET_free (lcr_current);
589  }
590  }
591  GNUNET_free (set);
592  }
593 
594  if (NULL != (listener = cs->listener))
595  {
596  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n");
598  listener->open_port = NULL;
599  while (NULL != (op = listener->op_head))
600  {
602  "Destroying incoming operation `%u' from peer `%s'\n",
603  (unsigned int) op->client_request_id,
604  GNUNET_i2s (&op->peer));
606  }
608  GNUNET_free (listener);
609  }
610  GNUNET_free (cs);
611  num_clients--;
612  if ((GNUNET_YES == in_shutdown) && (0 == num_clients))
613  {
614  if (NULL != cadet)
615  {
617  cadet = NULL;
618  }
619  }
620 }
621 
622 
631 static int
633 {
634  struct Operation *op = cls;
635  struct Listener *listener = op->listener;
636  const struct GNUNET_MessageHeader *nested_context;
637 
638  /* double operation request */
639  if (0 != op->suggest_id)
640  {
641  GNUNET_break_op (0);
642  return GNUNET_SYSERR;
643  }
644  /* This should be equivalent to the previous condition, but can't hurt to check twice */
645  if (NULL == op->listener)
646  {
647  GNUNET_break (0);
648  return GNUNET_SYSERR;
649  }
650  if (listener->operation !=
651  (enum GNUNET_SET_OperationType) ntohl (msg->operation))
652  {
653  GNUNET_break_op (0);
654  return GNUNET_SYSERR;
655  }
656  nested_context = GNUNET_MQ_extract_nested_mh (msg);
657  if ((NULL != nested_context) &&
658  (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE))
659  {
660  GNUNET_break_op (0);
661  return GNUNET_SYSERR;
662  }
663  return GNUNET_OK;
664 }
665 
666 
684 static void
686 {
687  struct Operation *op = cls;
688  struct Listener *listener = op->listener;
689  const struct GNUNET_MessageHeader *nested_context;
690  struct GNUNET_MQ_Envelope *env;
691  struct GNUNET_SET_RequestMessage *cmsg;
692 
693  nested_context = GNUNET_MQ_extract_nested_mh (msg);
694  /* Make a copy of the nested_context (application-specific context
695  information that is opaque to set) so we can pass it to the
696  listener later on */
697  if (NULL != nested_context)
698  op->context_msg = GNUNET_copy_message (nested_context);
699  op->remote_element_count = ntohl (msg->element_count);
700  GNUNET_log (
702  "Received P2P operation request (op %u, port %s) for active listener\n",
703  (uint32_t) ntohl (msg->operation),
704  GNUNET_h2s (&op->listener->app_id));
705  GNUNET_assert (0 == op->suggest_id);
706  if (0 == suggest_id)
707  suggest_id++;
708  op->suggest_id = suggest_id++;
709  GNUNET_assert (NULL != op->timeout_task);
710  GNUNET_SCHEDULER_cancel (op->timeout_task);
711  op->timeout_task = NULL;
714  op->context_msg);
715  GNUNET_log (
717  "Suggesting incoming request with accept id %u to listener %p of client %p\n",
718  op->suggest_id,
719  listener,
720  listener->cs);
721  cmsg->accept_id = htonl (op->suggest_id);
722  cmsg->peer_id = op->peer;
723  GNUNET_MQ_send (listener->cs->mq, env);
724  /* NOTE: GNUNET_CADET_receive_done() will be called in
725  #handle_client_accept() */
726 }
727 
728 
735 static void
736 execute_add (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
737 {
738  struct GNUNET_SET_Element el;
739  struct ElementEntry *ee;
740  struct GNUNET_HashCode hash;
741 
742  GNUNET_assert (GNUNET_MESSAGE_TYPE_SET_ADD == ntohs (msg->header.type));
743  el.size = ntohs (msg->header.size) - sizeof(*msg);
744  el.data = &msg[1];
745  el.element_type = ntohs (msg->element_type);
746  GNUNET_SET_element_hash (&el, &hash);
748  if (NULL == ee)
749  {
751  "Client inserts element %s of size %u\n",
752  GNUNET_h2s (&hash),
753  el.size);
754  ee = GNUNET_malloc (el.size + sizeof(*ee));
755  ee->element.size = el.size;
756  GNUNET_memcpy (&ee[1], el.data, el.size);
757  ee->element.data = &ee[1];
758  ee->element.element_type = el.element_type;
759  ee->remote = GNUNET_NO;
760  ee->mutations = NULL;
761  ee->mutations_size = 0;
762  ee->element_hash = hash;
765  set->content->elements,
766  &ee->element_hash,
767  ee,
769  }
770  else if (GNUNET_YES ==
772  set->current_generation,
775  {
777  "Client inserted element %s of size %u twice (ignored)\n",
778  GNUNET_h2s (&hash),
779  el.size);
780 
781  /* same element inserted twice */
782  return;
783  }
784 
785  {
786  struct MutationEvent mut = { .generation = set->current_generation,
787  .added = GNUNET_YES };
789  }
790  set->vt->add (set->state, ee);
791 }
792 
793 
800 static void
801 execute_remove (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
802 {
803  struct GNUNET_SET_Element el;
804  struct ElementEntry *ee;
805  struct GNUNET_HashCode hash;
806 
808  el.size = ntohs (msg->header.size) - sizeof(*msg);
809  el.data = &msg[1];
810  el.element_type = ntohs (msg->element_type);
811  GNUNET_SET_element_hash (&el, &hash);
813  if (NULL == ee)
814  {
815  /* Client tried to remove non-existing element. */
817  "Client removes non-existing element of size %u\n",
818  el.size);
819  return;
820  }
822  set->current_generation,
825  {
826  /* Client tried to remove element twice */
828  "Client removed element of size %u twice (ignored)\n",
829  el.size);
830  return;
831  }
832  else
833  {
834  struct MutationEvent mut = { .generation = set->current_generation,
835  .added = GNUNET_NO };
836 
838  "Client removes element of size %u\n",
839  el.size);
840 
842  }
843  set->vt->remove (set->state, ee);
844 }
845 
846 
853 static void
854 execute_mutation (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
855 {
856  switch (ntohs (msg->header.type))
857  {
859  execute_add (set, msg);
860  break;
861 
863  execute_remove (set, msg);
864  break;
865 
866  default:
867  GNUNET_break (0);
868  }
869 }
870 
871 
878 static void
880 {
881  struct PendingMutation *pm;
882 
883  if (0 != set->content->iterator_count)
884  return; /* still cannot do this */
885  while (NULL != (pm = set->content->pending_mutations_head))
886  {
889  pm);
891  "Executing pending mutation on %p.\n",
892  pm->set);
893  execute_mutation (pm->set, pm->msg);
894  GNUNET_free (pm->msg);
895  GNUNET_free (pm);
896  }
897 }
898 
899 
913 static void
915 {
916  int ret;
917  struct ElementEntry *ee;
918  struct GNUNET_MQ_Envelope *ev;
920 
921  GNUNET_assert (NULL != set->iter);
922  do
923  {
925  NULL,
926  (const void **) &ee);
927  if (GNUNET_NO == ret)
928  {
929  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration on %p done.\n", set);
932  set->iter = NULL;
933  set->iteration_id++;
935  set->content->iterator_count--;
937  GNUNET_MQ_send (set->cs->mq, ev);
938  return;
939  }
940  GNUNET_assert (NULL != ee);
941  }
942  while (GNUNET_NO ==
944  set->iter_generation,
948  "Sending iteration element on %p.\n",
949  set);
950  ev = GNUNET_MQ_msg_extra (msg,
951  ee->element.size,
953  GNUNET_memcpy (&msg[1], ee->element.data, ee->element.size);
954  msg->element_type = htons (ee->element.element_type);
955  msg->iteration_id = htons (set->iteration_id);
956  GNUNET_MQ_send (set->cs->mq, ev);
957 }
958 
959 
969 static void
970 handle_client_iterate (void *cls, const struct GNUNET_MessageHeader *m)
971 {
972  struct ClientState *cs = cls;
973  struct Set *set;
974 
975  if (NULL == (set = cs->set))
976  {
977  /* attempt to iterate over a non existing set */
978  GNUNET_break (0);
980  return;
981  }
982  if (NULL != set->iter)
983  {
984  /* Only one concurrent iterate-action allowed per set */
985  GNUNET_break (0);
987  return;
988  }
990  "Iterating set %p in gen %u with %u content elements\n",
991  (void *) set,
992  set->current_generation,
995  set->content->iterator_count++;
996  set->iter =
999  send_client_element (set);
1000 }
1001 
1002 
1011 static void
1013 {
1014  struct ClientState *cs = cls;
1015  struct Set *set;
1016 
1018  "Client created new set (operation %u)\n",
1019  (uint32_t) ntohl (msg->operation));
1020  if (NULL != cs->set)
1021  {
1022  /* There can only be one set per client */
1023  GNUNET_break (0);
1025  return;
1026  }
1027  set = GNUNET_new (struct Set);
1028  switch (ntohl (msg->operation))
1029  {
1031  set->vt = _GSS_intersection_vt ();
1032  break;
1033 
1035  set->vt = _GSS_union_vt ();
1036  break;
1037 
1038  default:
1039  GNUNET_free (set);
1040  GNUNET_break (0);
1042  return;
1043  }
1044  set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1045  set->state = set->vt->create ();
1046  if (NULL == set->state)
1047  {
1048  /* initialization failed (i.e. out of memory) */
1049  GNUNET_free (set);
1051  return;
1052  }
1053  set->content = GNUNET_new (struct SetContent);
1054  set->content->refcount = 1;
1056  set->cs = cs;
1057  cs->set = set;
1059 }
1060 
1061 
1070 static void
1072 {
1073  struct Operation *op = cls;
1074 
1075  op->timeout_task = NULL;
1077  "Remote peer's incoming request timed out\n");
1078  incoming_destroy (op);
1079 }
1080 
1081 
1098 static void *
1099 channel_new_cb (void *cls,
1100  struct GNUNET_CADET_Channel *channel,
1101  const struct GNUNET_PeerIdentity *source)
1102 {
1103  struct Listener *listener = cls;
1104  struct Operation *op;
1105 
1106  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New incoming channel\n");
1107  op = GNUNET_new (struct Operation);
1108  op->listener = listener;
1109  op->peer = *source;
1110  op->channel = channel;
1111  op->mq = GNUNET_CADET_get_mq (op->channel);
1115  op);
1117  return op;
1118 }
1119 
1120 
1137 static void
1138 channel_end_cb (void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
1139 {
1140  struct Operation *op = channel_ctx;
1141 
1142  op->channel = NULL;
1144 }
1145 
1146 
1152 void
1154 {
1155  struct GNUNET_CADET_Channel *channel;
1156 
1157  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "channel_end_cb called\n");
1158  if (NULL != (channel = op->channel))
1159  {
1160  /* This will free op; called conditionally as this helper function
1161  is also called from within the channel disconnect handler. */
1162  op->channel = NULL;
1163  GNUNET_CADET_channel_destroy (channel);
1164  }
1165  if (NULL != op->listener)
1166  {
1167  incoming_destroy (op);
1168  return;
1169  }
1170  if (NULL != op->set)
1171  op->set->vt->channel_death (op);
1172  else
1174  GNUNET_free (op);
1175 }
1176 
1177 
1192 static void
1194  const struct GNUNET_CADET_Channel *channel,
1195  int window_size)
1196 {
1197  /* FIXME: not implemented, we could do flow control here... */
1198 }
1199 
1200 
1207 static void
1209 {
1210  struct ClientState *cs = cls;
1211  struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1212  { GNUNET_MQ_hd_var_size (incoming_msg,
1214  struct OperationRequestMessage,
1215  NULL),
1216  GNUNET_MQ_hd_var_size (union_p2p_ibf,
1218  struct IBFMessage,
1219  NULL),
1220  GNUNET_MQ_hd_var_size (union_p2p_elements,
1223  NULL),
1224  GNUNET_MQ_hd_var_size (union_p2p_offer,
1226  struct GNUNET_MessageHeader,
1227  NULL),
1228  GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1230  struct InquiryMessage,
1231  NULL),
1232  GNUNET_MQ_hd_var_size (union_p2p_demand,
1234  struct GNUNET_MessageHeader,
1235  NULL),
1236  GNUNET_MQ_hd_fixed_size (union_p2p_done,
1238  struct GNUNET_MessageHeader,
1239  NULL),
1240  GNUNET_MQ_hd_fixed_size (union_p2p_over,
1242  struct GNUNET_MessageHeader,
1243  NULL),
1244  GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1246  struct GNUNET_MessageHeader,
1247  NULL),
1248  GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1250  struct GNUNET_MessageHeader,
1251  NULL),
1252  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1254  struct StrataEstimatorMessage,
1255  NULL),
1256  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1258  struct StrataEstimatorMessage,
1259  NULL),
1260  GNUNET_MQ_hd_var_size (union_p2p_full_element,
1263  NULL),
1264  GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1267  NULL),
1268  GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1270  struct BFMessage,
1271  NULL),
1272  GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1274  struct IntersectionDoneMessage,
1275  NULL),
1276  GNUNET_MQ_handler_end () };
1277  struct Listener *listener;
1278 
1279  if (NULL != cs->listener)
1280  {
1281  /* max. one active listener per client! */
1282  GNUNET_break (0);
1284  return;
1285  }
1286  listener = GNUNET_new (struct Listener);
1287  listener->cs = cs;
1288  cs->listener = listener;
1289  listener->app_id = msg->app_id;
1290  listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1293  "New listener created (op %u, port %s)\n",
1294  listener->operation,
1295  GNUNET_h2s (&listener->app_id));
1296  listener->open_port = GNUNET_CADET_open_port (cadet,
1297  &msg->app_id,
1298  &channel_new_cb,
1299  listener,
1301  &channel_end_cb,
1302  cadet_handlers);
1304 }
1305 
1306 
1314 static void
1316 {
1317  struct ClientState *cs = cls;
1318  struct Operation *op;
1319 
1320  op = get_incoming (ntohl (msg->accept_reject_id));
1321  if (NULL == op)
1322  {
1323  /* no matching incoming operation for this reject;
1324  could be that the other peer already disconnected... */
1326  "Client rejected unknown operation %u\n",
1327  (unsigned int) ntohl (msg->accept_reject_id));
1329  return;
1330  }
1332  "Peer request (op %u, app %s) rejected by client\n",
1333  op->listener->operation,
1334  GNUNET_h2s (&cs->listener->app_id));
1337 }
1338 
1339 
1346 static int
1348 {
1349  /* NOTE: Technically, we should probably check with the
1350  block library whether the element we are given is well-formed */
1351  return GNUNET_OK;
1352 }
1353 
1354 
1361 static void
1363 {
1364  struct ClientState *cs = cls;
1365  struct Set *set;
1366 
1367  if (NULL == (set = cs->set))
1368  {
1369  /* client without a set requested an operation */
1370  GNUNET_break (0);
1372  return;
1373  }
1375 
1376  if (0 != set->content->iterator_count)
1377  {
1378  struct PendingMutation *pm;
1379 
1380  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling mutation on set\n");
1381  pm = GNUNET_new (struct PendingMutation);
1382  pm->msg =
1383  (struct GNUNET_SET_ElementMessage *) GNUNET_copy_message (&msg->header);
1384  pm->set = set;
1387  pm);
1388  return;
1389  }
1390  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
1391  execute_mutation (set, msg);
1392 }
1393 
1394 
1401 static void
1403 {
1404  struct GenerationRange r;
1405 
1406  if (set->current_generation == set->content->latest_generation)
1407  {
1408  set->content->latest_generation++;
1409  set->current_generation++;
1410  return;
1411  }
1412 
1414 
1415  r.start = set->current_generation + 1;
1416  r.end = set->content->latest_generation + 1;
1417  set->content->latest_generation = r.end;
1418  set->current_generation = r.end;
1421  r);
1422 }
1423 
1424 
1434 static int
1436 {
1437  /* FIXME: suboptimal, even if the context below could be NULL,
1438  there are malformed messages this does not check for... */
1439  return GNUNET_OK;
1440 }
1441 
1442 
1451 static void
1453 {
1454  struct ClientState *cs = cls;
1455  struct Operation *op = GNUNET_new (struct Operation);
1456  const struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1457  { GNUNET_MQ_hd_var_size (incoming_msg,
1459  struct OperationRequestMessage,
1460  op),
1461  GNUNET_MQ_hd_var_size (union_p2p_ibf,
1463  struct IBFMessage,
1464  op),
1465  GNUNET_MQ_hd_var_size (union_p2p_elements,
1468  op),
1469  GNUNET_MQ_hd_var_size (union_p2p_offer,
1471  struct GNUNET_MessageHeader,
1472  op),
1473  GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1475  struct InquiryMessage,
1476  op),
1477  GNUNET_MQ_hd_var_size (union_p2p_demand,
1479  struct GNUNET_MessageHeader,
1480  op),
1481  GNUNET_MQ_hd_fixed_size (union_p2p_done,
1483  struct GNUNET_MessageHeader,
1484  op),
1485  GNUNET_MQ_hd_fixed_size (union_p2p_over,
1487  struct GNUNET_MessageHeader,
1488  op),
1489  GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1491  struct GNUNET_MessageHeader,
1492  op),
1493  GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1495  struct GNUNET_MessageHeader,
1496  op),
1497  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1499  struct StrataEstimatorMessage,
1500  op),
1501  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1503  struct StrataEstimatorMessage,
1504  op),
1505  GNUNET_MQ_hd_var_size (union_p2p_full_element,
1508  op),
1509  GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1512  op),
1513  GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1515  struct BFMessage,
1516  op),
1517  GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1519  struct IntersectionDoneMessage,
1520  op),
1521  GNUNET_MQ_handler_end () };
1522  struct Set *set;
1523  const struct GNUNET_MessageHeader *context;
1524 
1525  if (NULL == (set = cs->set))
1526  {
1527  GNUNET_break (0);
1528  GNUNET_free (op);
1530  return;
1531  }
1533  op->peer = msg->target_peer;
1534  op->result_mode = ntohl (msg->result_mode);
1535  op->client_request_id = ntohl (msg->request_id);
1536  op->byzantine = msg->byzantine;
1537  op->byzantine_lower_bound = msg->byzantine_lower_bound;
1538  op->force_full = msg->force_full;
1539  op->force_delta = msg->force_delta;
1541 
1542  /* Advance generation values, so that
1543  mutations won't interfer with the running operation. */
1544  op->set = set;
1545  op->generation_created = set->current_generation;
1546  advance_generation (set);
1549  "Creating new CADET channel to port %s for set operation type %u\n",
1550  GNUNET_h2s (&msg->app_id),
1551  set->operation);
1552  op->channel = GNUNET_CADET_channel_create (cadet,
1553  op,
1554  &msg->target_peer,
1555  &msg->app_id,
1557  &channel_end_cb,
1558  cadet_handlers);
1559  op->mq = GNUNET_CADET_get_mq (op->channel);
1560  op->state = set->vt->evaluate (op, context);
1561  if (NULL == op->state)
1562  {
1563  GNUNET_break (0);
1565  return;
1566  }
1568 }
1569 
1570 
1579 static void
1581 {
1582  struct ClientState *cs = cls;
1583  struct Set *set;
1584 
1585  if (NULL == (set = cs->set))
1586  {
1587  /* client without a set acknowledged receiving a value */
1588  GNUNET_break (0);
1590  return;
1591  }
1592  if (NULL == set->iter)
1593  {
1594  /* client sent an ack, but we were not expecting one (as
1595  set iteration has finished) */
1596  GNUNET_break (0);
1598  return;
1599  }
1601  if (ntohl (ack->send_more))
1602  {
1603  send_client_element (set);
1604  }
1605  else
1606  {
1608  set->iter = NULL;
1609  set->iteration_id++;
1610  }
1611 }
1612 
1613 
1620 static void
1622  const struct GNUNET_MessageHeader *mh)
1623 {
1624  struct ClientState *cs = cls;
1625  struct Set *set;
1626  struct LazyCopyRequest *cr;
1627  struct GNUNET_MQ_Envelope *ev;
1628  struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1629 
1630  if (NULL == (set = cs->set))
1631  {
1632  /* client without a set requested an operation */
1633  GNUNET_break (0);
1635  return;
1636  }
1638  "Client requested creation of lazy copy\n");
1639  cr = GNUNET_new (struct LazyCopyRequest);
1640  cr->cookie = ++lazy_copy_cookie;
1641  cr->source_set = set;
1644  resp_msg->cookie = cr->cookie;
1645  GNUNET_MQ_send (set->cs->mq, ev);
1647 }
1648 
1649 
1656 static void
1658  void *cls,
1659  const struct GNUNET_SET_CopyLazyConnectMessage *msg)
1660 {
1661  struct ClientState *cs = cls;
1662  struct LazyCopyRequest *cr;
1663  struct Set *set;
1664  int found;
1665 
1666  if (NULL != cs->set)
1667  {
1668  /* There can only be one set per client */
1669  GNUNET_break (0);
1671  return;
1672  }
1673  found = GNUNET_NO;
1674  for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1675  {
1676  if (cr->cookie == msg->cookie)
1677  {
1678  found = GNUNET_YES;
1679  break;
1680  }
1681  }
1682  if (GNUNET_NO == found)
1683  {
1684  /* client asked for copy with cookie we don't know */
1685  GNUNET_break (0);
1687  return;
1688  }
1691  "Client %p requested use of lazy copy\n",
1692  cs);
1693  set = GNUNET_new (struct Set);
1694  switch (cr->source_set->operation)
1695  {
1697  set->vt = _GSS_intersection_vt ();
1698  break;
1699 
1701  set->vt = _GSS_union_vt ();
1702  break;
1703 
1704  default:
1705  GNUNET_assert (0);
1706  return;
1707  }
1708 
1709  if (NULL == set->vt->copy_state)
1710  {
1711  /* Lazy copy not supported for this set operation */
1712  GNUNET_break (0);
1713  GNUNET_free (set);
1714  GNUNET_free (cr);
1716  return;
1717  }
1718 
1719  set->operation = cr->source_set->operation;
1720  set->state = set->vt->copy_state (cr->source_set->state);
1721  set->content = cr->source_set->content;
1722  set->content->refcount++;
1723 
1726  set->excluded_generations =
1729  * sizeof(struct GenerationRange));
1730 
1731  /* Advance the generation of the new set, so that mutations to the
1732  of the cloned set and the source set are independent. */
1733  advance_generation (set);
1734  set->cs = cs;
1735  cs->set = set;
1736  GNUNET_free (cr);
1738 }
1739 
1740 
1747 static void
1749 {
1750  struct ClientState *cs = cls;
1751  struct Set *set;
1752  struct Operation *op;
1753  int found;
1754 
1755  if (NULL == (set = cs->set))
1756  {
1757  /* client without a set requested an operation */
1758  GNUNET_break (0);
1760  return;
1761  }
1762  found = GNUNET_NO;
1763  for (op = set->ops_head; NULL != op; op = op->next)
1764  {
1765  if (op->client_request_id == ntohl (msg->request_id))
1766  {
1767  found = GNUNET_YES;
1768  break;
1769  }
1770  }
1771  if (GNUNET_NO == found)
1772  {
1773  /* It may happen that the operation was already destroyed due to
1774  * the other peer disconnecting. The client may not know about this
1775  * yet and try to cancel the (just barely non-existent) operation.
1776  * So this is not a hard error.
1778  "Client canceled non-existent op %u\n",
1779  (uint32_t) ntohl (msg->request_id));
1780  }
1781  else
1782  {
1784  "Client requested cancel for op %u\n",
1785  (uint32_t) ntohl (msg->request_id));
1787  }
1789 }
1790 
1791 
1800 static void
1802 {
1803  struct ClientState *cs = cls;
1804  struct Set *set;
1805  struct Operation *op;
1806  struct GNUNET_SET_ResultMessage *result_message;
1807  struct GNUNET_MQ_Envelope *ev;
1808  struct Listener *listener;
1809 
1810  if (NULL == (set = cs->set))
1811  {
1812  /* client without a set requested to accept */
1813  GNUNET_break (0);
1815  return;
1816  }
1817  op = get_incoming (ntohl (msg->accept_reject_id));
1818  if (NULL == op)
1819  {
1820  /* It is not an error if the set op does not exist -- it may
1821  * have been destroyed when the partner peer disconnected. */
1822  GNUNET_log (
1824  "Client %p accepted request %u of listener %p that is no longer active\n",
1825  cs,
1826  ntohl (msg->accept_reject_id),
1827  cs->listener);
1828  ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SET_RESULT);
1829  result_message->request_id = msg->request_id;
1830  result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1831  GNUNET_MQ_send (set->cs->mq, ev);
1833  return;
1834  }
1836  "Client accepting request %u\n",
1837  (uint32_t) ntohl (msg->accept_reject_id));
1838  listener = op->listener;
1839  op->listener = NULL;
1840  GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op);
1841  op->set = set;
1843  op->client_request_id = ntohl (msg->request_id);
1844  op->result_mode = ntohl (msg->result_mode);
1845  op->byzantine = msg->byzantine;
1846  op->byzantine_lower_bound = msg->byzantine_lower_bound;
1847  op->force_full = msg->force_full;
1848  op->force_delta = msg->force_delta;
1849 
1850  /* Advance generation values, so that future mutations do not
1851  interfer with the running operation. */
1852  op->generation_created = set->current_generation;
1853  advance_generation (set);
1854  GNUNET_assert (NULL == op->state);
1855  op->state = set->vt->accept (op);
1856  if (NULL == op->state)
1857  {
1858  GNUNET_break (0);
1860  return;
1861  }
1862  /* Now allow CADET to continue, as we did not do this in
1863  #handle_incoming_msg (as we wanted to first see if the
1864  local client would accept the request). */
1865  GNUNET_CADET_receive_done (op->channel);
1867 }
1868 
1869 
1875 static void
1876 shutdown_task (void *cls)
1877 {
1878  /* Delay actual shutdown to allow service to disconnect clients */
1880  if (0 == num_clients)
1881  {
1882  if (NULL != cadet)
1883  {
1885  cadet = NULL;
1886  }
1887  }
1889  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1890 }
1891 
1892 
1901 static void
1902 run (void *cls,
1903  const struct GNUNET_CONFIGURATION_Handle *cfg,
1905 {
1906  /* FIXME: need to modify SERVICE (!) API to allow
1907  us to run a shutdown task *after* clients were
1908  forcefully disconnected! */
1912  if (NULL == cadet)
1913  {
1915  _ ("Could not connect to CADET service\n"));
1917  return;
1918  }
1919 }
1920 
1921 
1926  "set",
1928  &run,
1931  NULL,
1932  GNUNET_MQ_hd_fixed_size (client_accept,
1934  struct GNUNET_SET_AcceptMessage,
1935  NULL),
1936  GNUNET_MQ_hd_fixed_size (client_iter_ack,
1939  NULL),
1940  GNUNET_MQ_hd_var_size (client_mutation,
1943  NULL),
1944  GNUNET_MQ_hd_fixed_size (client_create_set,
1946  struct GNUNET_SET_CreateMessage,
1947  NULL),
1948  GNUNET_MQ_hd_fixed_size (client_iterate,
1950  struct GNUNET_MessageHeader,
1951  NULL),
1952  GNUNET_MQ_hd_var_size (client_evaluate,
1955  NULL),
1956  GNUNET_MQ_hd_fixed_size (client_listen,
1958  struct GNUNET_SET_ListenMessage,
1959  NULL),
1960  GNUNET_MQ_hd_fixed_size (client_reject,
1962  struct GNUNET_SET_RejectMessage,
1963  NULL),
1964  GNUNET_MQ_hd_var_size (client_mutation,
1967  NULL),
1968  GNUNET_MQ_hd_fixed_size (client_cancel,
1970  struct GNUNET_SET_CancelMessage,
1971  NULL),
1972  GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
1974  struct GNUNET_MessageHeader,
1975  NULL),
1976  GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
1979  NULL),
1981 
1982 
1983 /* end of gnunet-service-set.c */
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
static const struct GNUNET_CONFIGURATION_Handle * cfg
Configuration we are using.
Definition: gnunet-abd.c:36
static int ret
Return value of the commandline.
Definition: gnunet-abd.c:81
static struct GNUNET_IDENTITY_EgoLookup * el
EgoLookup.
Definition: gnunet-abd.c:51
static struct GNUNET_ARM_Operation * op
Current operation.
Definition: gnunet-arm.c:144
static struct GNUNET_ARM_MonitorHandle * m
Monitor connection with ARM.
Definition: gnunet-arm.c:104
static int start
Set if we are to start default services (including ARM).
Definition: gnunet-arm.c:39
static int end
Set if we are to shutdown all services (including ARM).
Definition: gnunet-arm.c:34
static struct GNUNET_CADET_Handle * mh
Cadet handle.
Definition: gnunet-cadet.c:92
struct GNUNET_HashCode key
The key used in the DHT.
static GstElement * source
Appsrc instance into which we write data for the pipeline.
static pa_context * context
Pulseaudio context.
static char * value
Value of the record to add/remove.
static struct GNUNET_SERVICE_Handle * service
Handle to our service instance.
static int garbage_collect_cb(void *cls, const struct GNUNET_HashCode *key, void *value)
Function invoked to check if an element can be removed from the set's history because it is no longer...
void _GSS_operation_destroy2(struct Operation *op)
This function probably should not exist and be replaced by inlining more specific logic in the variou...
int _GSS_is_element_of_operation(struct ElementEntry *ee, struct Operation *op)
Is element ee part of the set used by op?
static uint32_t suggest_id
Counter for allocating unique IDs for clients, used to identify incoming operation requests from remo...
static void execute_delayed_mutations(struct Set *set)
Execute mutations that were delayed on a set because of pending operations.
static struct Listener * listener_head
Listeners are held in a doubly linked list.
static int check_client_mutation(void *cls, const struct GNUNET_SET_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
static int check_client_evaluate(void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
static void collect_generation_garbage(struct Set *set)
Collect and destroy elements that are not needed anymore, because their lifetime (as determined by th...
static struct GNUNET_CADET_Handle * cadet
Handle to the cadet service, used to listen for and connect to remote peers.
static void * client_connect_cb(void *cls, struct GNUNET_SERVICE_Client *c, struct GNUNET_MQ_Handle *mq)
Callback called when a client connects to the service.
static void * channel_new_cb(void *cls, struct GNUNET_CADET_Channel *channel, const struct GNUNET_PeerIdentity *source)
Method called whenever another peer has added us to a channel the other peer initiated.
static void handle_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Handle a request for a set operation from another peer.
static int in_shutdown
Are we in shutdown? if GNUNET_YES and the number of clients drops to zero, disconnect from CADET.
static int destroy_elements_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator over hash map entries to free element entries.
static void shutdown_task(void *cls)
Called to clean up, after a shutdown has been requested.
static void handle_client_iter_ack(void *cls, const struct GNUNET_SET_IterAckMessage *ack)
Handle an ack from a client, and send the next element.
static void handle_client_mutation(void *cls, const struct GNUNET_SET_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
struct GNUNET_STATISTICS_Handle * _GSS_statistics
Statistics handle.
static void send_client_element(struct Set *set)
Send the next element of a set to the set's client.
static unsigned int num_clients
Number of active clients.
static void execute_mutation(struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
Perform a mutation on a set as specified by the msg.
static void channel_window_cb(void *cls, const struct GNUNET_CADET_Channel *channel, int window_size)
Function called whenever an MQ-channel's transmission window size changes.
static void handle_client_iterate(void *cls, const struct GNUNET_MessageHeader *m)
Called when a client wants to iterate the elements of a set.
static void advance_generation(struct Set *set)
Advance the current generation of a set, adding exclusion ranges if necessary.
static void execute_add(struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
Add an element to set as specified by msg.
static void handle_client_copy_lazy_prepare(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a request from the client to copy a set.
static int check_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Check a request for a set operation from another peer.
static int is_element_of_generation(struct ElementEntry *ee, unsigned int query_generation, struct GenerationRange *excluded, unsigned int excluded_size)
Is element ee part of the set during query_generation?
static int is_excluded_generation(unsigned int generation, struct GenerationRange *excluded, unsigned int excluded_size)
Is generation in the range of exclusions?
static uint32_t lazy_copy_cookie
Generator for unique cookie we set per lazy copy request.
static void incoming_destroy(struct Operation *op)
Destroy an incoming request from a remote peer.
static struct Listener * listener_tail
Listeners are held in a doubly linked list.
static void execute_remove(struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
Remove an element from set as specified by msg.
static struct LazyCopyRequest * lazy_copy_head
DLL of lazy copy requests by this client.
static void handle_client_copy_lazy_connect(void *cls, const struct GNUNET_SET_CopyLazyConnectMessage *msg)
Handle a request from the client to connect to a copy of a set.
static void handle_client_accept(void *cls, const struct GNUNET_SET_AcceptMessage *msg)
Handle a request from the client to accept a set operation that came from a remote peer.
static struct Operation * get_incoming(uint32_t id)
Get the incoming socket associated with the given id.
static struct LazyCopyRequest * lazy_copy_tail
DLL of lazy copy requests by this client.
static void handle_client_reject(void *cls, const struct GNUNET_SET_RejectMessage *msg)
Called when the listening client rejects an operation request by another peer.
static void handle_client_evaluate(void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
#define INCOMING_CHANNEL_TIMEOUT
How long do we hold on to an incoming channel if there is no local listener before giving up?
static void handle_client_cancel(void *cls, const struct GNUNET_SET_CancelMessage *msg)
Handle a request from the client to cancel a running set operation.
GNUNET_SERVICE_MAIN("set", GNUNET_SERVICE_OPTION_NONE, &run, &client_connect_cb, &client_disconnect_cb, NULL, GNUNET_MQ_hd_fixed_size(client_accept, GNUNET_MESSAGE_TYPE_SET_ACCEPT, struct GNUNET_SET_AcceptMessage, NULL), GNUNET_MQ_hd_fixed_size(client_iter_ack, GNUNET_MESSAGE_TYPE_SET_ITER_ACK, struct GNUNET_SET_IterAckMessage, NULL), GNUNET_MQ_hd_var_size(client_mutation, GNUNET_MESSAGE_TYPE_SET_ADD, struct GNUNET_SET_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size(client_create_set, GNUNET_MESSAGE_TYPE_SET_CREATE, struct GNUNET_SET_CreateMessage, NULL), GNUNET_MQ_hd_fixed_size(client_iterate, GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST, struct GNUNET_MessageHeader, NULL), GNUNET_MQ_hd_var_size(client_evaluate, GNUNET_MESSAGE_TYPE_SET_EVALUATE, struct GNUNET_SET_EvaluateMessage, NULL), GNUNET_MQ_hd_fixed_size(client_listen, GNUNET_MESSAGE_TYPE_SET_LISTEN, struct GNUNET_SET_ListenMessage, NULL), GNUNET_MQ_hd_fixed_size(client_reject, GNUNET_MESSAGE_TYPE_SET_REJECT, struct GNUNET_SET_RejectMessage, NULL), GNUNET_MQ_hd_var_size(client_mutation, GNUNET_MESSAGE_TYPE_SET_REMOVE, struct GNUNET_SET_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size(client_cancel, GNUNET_MESSAGE_TYPE_SET_CANCEL, struct GNUNET_SET_CancelMessage, NULL), GNUNET_MQ_hd_fixed_size(client_copy_lazy_prepare, GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE, struct GNUNET_MessageHeader, NULL), GNUNET_MQ_hd_fixed_size(client_copy_lazy_connect, GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT, struct GNUNET_SET_CopyLazyConnectMessage, NULL), GNUNET_MQ_handler_end())
Define "main" method using service macro.
static void client_disconnect_cb(void *cls, struct GNUNET_SERVICE_Client *client, void *internal_cls)
Clean up after a client has disconnected.
void _GSS_operation_destroy(struct Operation *op, int gc)
Destroy the given operation.
static void channel_end_cb(void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
Function called whenever a channel is destroyed.
static void incoming_timeout_cb(void *cls)
Timeout happens iff:
static void handle_client_listen(void *cls, const struct GNUNET_SET_ListenMessage *msg)
Called when a client wants to create a new listener.
static void handle_client_create_set(void *cls, const struct GNUNET_SET_CreateMessage *msg)
Called when a client wants to create a new set.
static void run(void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_SERVICE_Handle *service)
Function called by the service's run method to run service-specific setup code.
common components for the implementation the different set operations
const struct SetVT * _GSS_intersection_vt(void)
Get the table with implementing functions for set intersection.
const struct SetVT * _GSS_union_vt(void)
Get the table with implementing functions for set union.
two-peer set operations
Peer-to-Peer messages for gnunet set.
two-peer set operations
static struct GNUNET_TRANSPORT_PluginMonitor * pm
Handle if we are monitoring plugin session activity.
API to create, modify and access statistics.
struct GNUNET_CADET_Channel * GNUNET_CADET_channel_create(struct GNUNET_CADET_Handle *h, void *channel_cls, const struct GNUNET_PeerIdentity *destination, const struct GNUNET_HashCode *port, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Create a new channel towards a remote peer.
Definition: cadet_api.c:1015
void GNUNET_CADET_receive_done(struct GNUNET_CADET_Channel *channel)
Send an ack on the channel to confirm the processing of a message.
Definition: cadet_api.c:872
void GNUNET_CADET_channel_destroy(struct GNUNET_CADET_Channel *channel)
Destroy an existing channel.
Definition: cadet_api.c:830
void GNUNET_CADET_disconnect(struct GNUNET_CADET_Handle *handle)
Disconnect from the cadet service.
Definition: cadet_api.c:774
void GNUNET_CADET_close_port(struct GNUNET_CADET_Port *p)
Close a port opened with GNUNET_CADET_open_port.
Definition: cadet_api.c:801
struct GNUNET_MQ_Handle * GNUNET_CADET_get_mq(const struct GNUNET_CADET_Channel *channel)
Obtain the message queue for a connected peer.
Definition: cadet_api.c:1066
struct GNUNET_CADET_Handle * GNUNET_CADET_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the MQ-based cadet service.
Definition: cadet_api.c:894
struct GNUNET_CADET_Port * GNUNET_CADET_open_port(struct GNUNET_CADET_Handle *h, const struct GNUNET_HashCode *port, GNUNET_CADET_ConnectEventHandler connects, void *connects_cls, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Open a port to receive incoming MQ-based channels.
Definition: cadet_api.c:954
uint32_t GNUNET_CRYPTO_random_u32(enum GNUNET_CRYPTO_Quality mode, uint32_t i)
Produce a random value.
@ GNUNET_CRYPTO_QUALITY_NONCE
Randomness for IVs etc.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_CONTAINER_DLL_insert_tail(head, tail, element)
Insert an element at the tail of a DLL.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MultiHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_iterator_next(struct GNUNET_CONTAINER_MultiHashMapIterator *iter, struct GNUNET_HashCode *key, const void **value)
Retrieve the next element from the hash map at the iterator's position.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
unsigned int GNUNET_CONTAINER_multihashmap_size(const struct GNUNET_CONTAINER_MultiHashMap *map)
Get the number of key-value pairs in the map.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
struct GNUNET_CONTAINER_MultiHashMapIterator * GNUNET_CONTAINER_multihashmap_iterator_create(const struct GNUNET_CONTAINER_MultiHashMap *map)
Create an iterator for a multihashmap.
void GNUNET_CONTAINER_multihashmap_iterator_destroy(struct GNUNET_CONTAINER_MultiHashMapIterator *iter)
Destroy a multihashmap iterator.
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY
There must only be one value per key; storing a value should fail if a value under the same key alrea...
#define GNUNET_log(kind,...)
#define GNUNET_MAX(a, b)
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
#define GNUNET_MIN(a, b)
@ GNUNET_OK
@ GNUNET_YES
@ GNUNET_NO
@ GNUNET_SYSERR
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur.
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_DEBUG
@ GNUNET_ERROR_TYPE_INFO
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_array_append(arr, len, element)
Append an element to an array (growing the array by one).
struct GNUNET_MessageHeader * GNUNET_copy_message(const struct GNUNET_MessageHeader *msg)
Create a copy of the given message.
#define GNUNET_free(ptr)
Wrapper around free.
#define GNUNET_memdup(buf, size)
Allocate and initialize a block of memory.
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:304
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct.
Definition: gnunet_mq_lib.h:62
#define GNUNET_MQ_msg_nested_mh(mvar, type, mh)
Allocate a GNUNET_MQ_Envelope, and append a payload message after the given message struct.
#define GNUNET_MQ_msg_header(type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
Definition: gnunet_mq_lib.h:86
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:77
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
#define GNUNET_MQ_extract_nested_mh(var)
Return a pointer to the message at the end of the given message.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER
Tell the other peer which hashes match a given IBF key.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY
Tell the other peer to send us a list of hashes that match an IBF key.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE
Strata estimator.
#define GNUNET_MESSAGE_TYPE_SET_EVALUATE
Evaluate a set operation.
#define GNUNET_MESSAGE_TYPE_SET_CANCEL
Cancel a set operation.
#define GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE
Give the client an ID for connecting to the set's copy.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF
Invertible bloom filter.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE
Set operation is done.
#define GNUNET_MESSAGE_TYPE_SET_ADD
Add element to set.
#define GNUNET_MESSAGE_TYPE_SET_REQUEST
Notify the client of a request from a remote peer.
#define GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE
Ask the set service to prepare a copy of a set.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC
Compressed strata estimator.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT
Send a set element, not as response to a demand but because we're sending the full set.
#define GNUNET_MESSAGE_TYPE_SET_REJECT
Reject a set request.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS
Actual set elements.
#define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
Request a set operation from a remote peer.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SET_LISTEN
Listen for operation requests.
#define GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE
Intersection operation is done.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER
Request all missing elements from the other peer, based on their sets and the elements we previously ...
#define GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT
Element result for the iterating client.
#define GNUNET_MESSAGE_TYPE_SET_ACCEPT
Accept a set request.
#define GNUNET_MESSAGE_TYPE_SET_RESULT
Create an empty set.
#define GNUNET_MESSAGE_TYPE_SET_REMOVE
Remove element from set.
#define GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST
Start iteration over set elements.
#define GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO
Information about the element count for intersection.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE
Request all missing elements from the other peer, based on their sets and the elements we previously ...
#define GNUNET_MESSAGE_TYPE_SET_ITER_ACK
Acknowledge result from iteration.
#define GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT
Sent by the client to the server to connect to an existing, lazily copied set.
#define GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF
Bloom filter message for intersection exchange started by Bob.
#define GNUNET_MESSAGE_TYPE_SET_ITER_DONE
Iteration end marker for the client.
#define GNUNET_MESSAGE_TYPE_SET_CREATE
Create a new local set.
void GNUNET_SCHEDULER_shutdown(void)
Request the shutdown of a scheduler.
Definition: scheduler.c:562
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run on shutdown, that is when a CTRL-C signal is received,...
Definition: scheduler.c:1334
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:975
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1272
void GNUNET_SERVICE_client_drop(struct GNUNET_SERVICE_Client *c)
Ask the server to disconnect from the given client.
Definition: service.c:2330
void GNUNET_SERVICE_client_continue(struct GNUNET_SERVICE_Client *c)
Continue receiving further messages from the given client.
Definition: service.c:2249
@ GNUNET_SERVICE_OPTION_NONE
Use defaults.
#define GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE
Maximum size of a context message for set operation requests.
GNUNET_SET_OperationType
The operation that a set set supports.
void GNUNET_SET_element_hash(const struct GNUNET_SET_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition: set_api.c:1184
@ GNUNET_SET_STATUS_FAILURE
The other peer refused to to the operation with us, or something went wrong.
@ GNUNET_SET_OPERATION_INTERSECTION
Set intersection, only return elements that are in both sets.
@ GNUNET_SET_OPERATION_UNION
Set union, return all elements that are in at least one of the sets.
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
#define _(String)
GNU gettext support macro.
Definition: platform.h:177
Bloom filter messages exchanged for set intersection calculation.
State we keep per client.
struct GNUNET_MQ_Handle * mq
MQ to talk to client.
struct Listener * listener
Listener, if associated with the client, otherwise NULL.
struct Set * set
Set, if associated with the client, otherwise NULL.
struct GNUNET_SERVICE_Client * client
Client this is about.
Information about an element element in the set.
int remote
GNUNET_YES if the element is a remote element, and does not belong to the operation's set.
struct GNUNET_SET_Element element
The actual element.
struct MutationEvent * mutations
If mutations is not NULL, it contains a list of mutations, ordered by increasing generation.
unsigned int mutations_size
Number of elements in the array mutations.
struct GNUNET_HashCode element_hash
Hash of the element.
struct GNUNET_ARM_Operation * next
This is a doubly-linked list.
Definition: arm_api.c:45
Opaque handle to a channel.
Definition: cadet.h:116
Opaque handle to the service.
Definition: cadet_api.c:39
Opaque handle to a port.
Definition: cadet_api.c:80
Internal representation of the hash map.
A 512-bit hashcode.
Handle to a message queue.
Definition: mq.c:87
Message handler for a specific message type.
Header for all communications.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
The identity of the host (wraps the signing key of the peer).
Handle to a client that is connected to a service.
Definition: service.c:252
Handle to a service.
Definition: service.c:118
Message sent by a listening client to the service to accept performing the operation with the other p...
Definition: set.h:83
Sent to the service by the client in order to cancel a set operation.
Definition: set.h:307
Client connects to a lazily copied set.
Definition: set.h:385
Server responds to a lazy copy request.
Definition: set.h:368
uint32_t cookie
Temporary name for the copied set.
Definition: set.h:377
Message sent by the client to the service to ask starting a new set to perform operations with.
Definition: set.h:41
Message sent by client to the service to add or remove an element to/from the set.
Definition: set.h:281
Element stored in a set.
uint16_t size
Number of bytes in the buffer pointed to by data.
const void * data
Actual data of the element.
uint16_t element_type
Application-specific element type.
Message sent by client to service to initiate a set operation as a client (not as listener).
Definition: set.h:181
Client acknowledges receiving element in iteration.
Definition: set.h:351
uint32_t send_more
Non-zero if the service should continue sending elements.
Definition: set.h:360
Set element transmitted by service to client in response to a set iteration request.
Definition: set.h:325
Message sent by the client to the service to start listening for incoming requests to perform a certa...
Definition: set.h:60
Message sent by a listening client to the service to reject performing the operation with the other p...
Definition: set.h:136
A request for an operation with another client.
Definition: set.h:153
struct GNUNET_PeerIdentity peer_id
Identity of the requesting peer.
Definition: set.h:168
uint32_t accept_id
ID of the to identify the request when accepting or rejecting it.
Definition: set.h:163
Message sent by the service to the client to indicate an element that is removed (set intersection) o...
Definition: set.h:245
uint32_t request_id
id the result belongs to
Definition: set.h:259
uint16_t result_status
Was the evaluation successful? Contains an enum GNUNET_SET_Status in NBO.
Definition: set.h:265
Handle for the service.
Context for the garbage_collect_cb().
unsigned int max_op_generation
Largest generation for which an operation is still pending.
unsigned int min_op_generation
Lowest generation for which an operation is still pending.
struct GNUNET_CONTAINER_MultiHashMap * map
Map for which we are garbage collecting removed elements.
unsigned int start
First generation that is excluded.
unsigned int end
Generation after the last excluded generation.
Message containing buckets of an invertible bloom filter.
estimate_best_mode_of_operation (uint64_t avg_element_size, uint64_t local_set_size,...
Last message, send to confirm the final set.
During intersection, the first (and possibly second) message send it the number of elements in the se...
Lazy copy requests made by a client.
struct LazyCopyRequest * next
Kept in a DLL.
struct LazyCopyRequest * prev
Kept in a DLL.
struct Set * source_set
Which set are we supposed to copy?
uint32_t cookie
Cookie identifying the request.
A listener is inhabited by a client, and waits for evaluation requests from remote peers.
struct Listener * next
Listeners are held in a doubly linked list.
struct ClientState * cs
Client that owns the listener.
enum GNUNET_SET_OperationType operation
The type of the operation.
struct GNUNET_HashCode app_id
Application ID for the operation, used to distinguish multiple operations of the same type with the s...
struct GNUNET_CADET_Port * open_port
The port we are listening on with CADET.
struct Listener * prev
Listeners are held in a doubly linked list.
struct Operation * op_tail
Tail of DLL of operations this listener is responsible for.
struct Operation * op_head
Head of DLL of operations this listener is responsible for.
MutationEvent gives information about changes to an element (removal / addition) in a set content.
unsigned int generation
First generation affected by this mutation event.
int added
If added is GNUNET_YES, then this is a remove event, otherwise it is an add event.
Operation context used to execute a set operation.
struct GNUNET_CADET_Channel * channel
Channel to the peer.
struct Listener * listener
Port this operation runs on.
struct Set * set
Set associated with the operation, NULL until the spec has been associated with a set.
Information about a mutation to apply to a set.
struct Set * set
Set this mutation is about.
SetContent stores the actual set elements, which may be shared by multiple generations derived from o...
int iterator_count
Number of concurrently active iterators.
unsigned int latest_generation
FIXME: document!
struct GNUNET_CONTAINER_MultiHashMap * elements
Maps struct GNUNET_HashCode * to struct ElementEntry *.
struct PendingMutation * pending_mutations_head
Mutations requested by the client that we're unable to execute right now because we're iterating over...
unsigned int refcount
Number of references to the content.
struct PendingMutation * pending_mutations_tail
Mutations requested by the client that we're unable to execute right now because we're iterating over...
SetCreateImpl create
Callback for the set creation.
SetAddRemoveImpl add
Callback for element insertion.
OpEvaluateImpl evaluate
Callback for starting evaluation with a remote peer.
OpCancelImpl cancel
Callback for canceling an operation.
SetAddRemoveImpl remove
Callback for element removal.
SetCopyStateImpl copy_state
Callback for making a copy of a set's internal state.
OpAcceptImpl accept
Callback for accepting a set operation request.
SetDestroyImpl destroy_set
Callback for destruction of the set state.
A set that supports a specific operation with other peers.
unsigned int iter_generation
Generation we're currently iteration over.
struct Operation * ops_head
Evaluate operations are held in a linked list.
struct Operation * ops_tail
Evaluate operations are held in a linked list.
struct SetState * state
Implementation-specific state.
enum GNUNET_SET_OperationType operation
Type of operation supported for this set.
uint16_t iteration_id
Each iter is assigned a unique number, so that the client can distinguish iterations.
struct GenerationRange * excluded_generations
List of generations we have to exclude, due to lazy copies.
struct SetContent * content
Content, possibly shared by multiple sets, and thus reference counted.
const struct SetVT * vt
Virtual table for this set.
struct GNUNET_CONTAINER_MultiHashMapIterator * iter
Current state of iterating elements for the client.
struct ClientState * cs
Client that owns the set.
unsigned int excluded_generations_size
Number of elements in array excluded_generations.
unsigned int current_generation
Current generation, that is, number of previously executed operations and lazy copies on the underlyi...
Strata estimator together with the peer's overall set size.