GNUnet 0.21.1
gnunet-service-set.c
Go to the documentation of this file.
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
26#include "platform.h"
27#include "gnunet-service-set.h"
32
37#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
38
39
44{
49
54
58 struct Set *source_set;
59
63 uint32_t cookie;
64};
65
66
72{
76 struct Listener *next;
77
81 struct Listener *prev;
82
89
96
102
107
113
118};
119
120
126
131
136
140static uint32_t lazy_copy_cookie;
141
146
150static struct Listener *listener_head;
151
155static struct Listener *listener_tail;
156
160static unsigned int num_clients;
161
166static int in_shutdown;
167
174static uint32_t suggest_id;
175
176
184static struct Operation *
185get_incoming (uint32_t id)
186{
187 for (struct Listener *listener = listener_head; NULL != listener;
189 {
190 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
191 if (op->suggest_id == id)
192 return op;
193 }
194 return NULL;
195}
196
197
203static void
205{
206 struct Listener *listener;
207
209 "Destroying incoming operation %p\n",
210 op);
211 if (NULL != (listener = op->listener))
212 {
213 GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op);
214 op->listener = NULL;
215 }
216 if (NULL != op->timeout_task)
217 {
218 GNUNET_SCHEDULER_cancel (op->timeout_task);
219 op->timeout_task = NULL;
220 }
222}
223
224
229{
234
238 unsigned int min_op_generation;
239
243 unsigned int max_op_generation;
244};
245
246
256static int
257garbage_collect_cb (void *cls, const struct GNUNET_HashCode *key, void *value)
258{
259 // struct GarbageContext *gc = cls;
260 // struct ElementEntry *ee = value;
261
262 // if (GNUNET_YES != ee->removed)
263 // return GNUNET_OK;
264 // if ( (gc->max_op_generation < ee->generation_added) ||
265 // (ee->generation_removed > gc->min_op_generation) )
266 // {
267 // GNUNET_assert (GNUNET_YES ==
268 // GNUNET_CONTAINER_multihashmap_remove (gc->map,
269 // key,
270 // ee));
271 // GNUNET_free (ee);
272 // }
273 return GNUNET_OK;
274}
275
276
284static void
286{
287 struct GarbageContext gc;
288
289 gc.min_op_generation = UINT_MAX;
290 gc.max_op_generation = 0;
291 for (struct Operation *op = set->ops_head; NULL != op; op = op->next)
292 {
294 GNUNET_MIN (gc.min_op_generation, op->generation_created);
296 GNUNET_MAX (gc.max_op_generation, op->generation_created);
297 }
298 gc.map = set->content->elements;
301 &gc);
302}
303
304
313static int
314is_excluded_generation (unsigned int generation,
315 struct GenerationRange *excluded,
316 unsigned int excluded_size)
317{
318 for (unsigned int i = 0; i < excluded_size; i++)
319 if ((generation >= excluded[i].start) && (generation < excluded[i].end))
320 return GNUNET_YES;
321 return GNUNET_NO;
322}
323
324
334static int
336 unsigned int query_generation,
337 struct GenerationRange *excluded,
338 unsigned int excluded_size)
339{
340 struct MutationEvent *mut;
341 int is_present;
342
343 GNUNET_assert (NULL != ee->mutations);
344 if (GNUNET_YES ==
345 is_excluded_generation (query_generation, excluded, excluded_size))
346 {
347 GNUNET_break (0);
348 return GNUNET_NO;
349 }
350
351 is_present = GNUNET_NO;
352
353 /* Could be made faster with binary search, but lists
354 are small, so why bother. */
355 for (unsigned int i = 0; i < ee->mutations_size; i++)
356 {
357 mut = &ee->mutations[i];
358
359 if (mut->generation > query_generation)
360 {
361 /* The mutation doesn't apply to our generation
362 anymore. We can'b break here, since mutations aren't
363 sorted by generation. */
364 continue;
365 }
366
367 if (GNUNET_YES ==
368 is_excluded_generation (mut->generation, excluded, excluded_size))
369 {
370 /* The generation is excluded (because it belongs to another
371 fork via a lazy copy) and thus mutations aren't considered
372 for membership testing. */
373 continue;
374 }
375
376 /* This would be an inconsistency in how we manage mutations. */
377 if ((GNUNET_YES == is_present) && (GNUNET_YES == mut->added))
378 GNUNET_assert (0);
379 /* Likewise. */
380 if ((GNUNET_NO == is_present) && (GNUNET_NO == mut->added))
381 GNUNET_assert (0);
382
383 is_present = mut->added;
384 }
385
386 return is_present;
387}
388
389
397int
399{
400 return is_element_of_generation (ee,
401 op->generation_created,
402 op->set->excluded_generations,
403 op->set->excluded_generations_size);
404}
405
406
420void
422{
423 struct Set *set = op->set;
424 struct GNUNET_CADET_Channel *channel;
425
426 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op);
427 GNUNET_assert (NULL == op->listener);
428 if (NULL != op->state)
429 {
430 set->vt->cancel (op);
431 op->state = NULL;
432 }
433 if (NULL != set)
434 {
436 op->set = NULL;
437 }
438 if (NULL != op->context_msg)
439 {
440 GNUNET_free (op->context_msg);
441 op->context_msg = NULL;
442 }
443 if (NULL != (channel = op->channel))
444 {
445 /* This will free op; called conditionally as this helper function
446 is also called from within the channel disconnect handler. */
447 op->channel = NULL;
449 }
450 if ((NULL != set) && (GNUNET_YES == gc))
452 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
453 * there was a channel end handler that will free 'op' on the call stack. */
454}
455
456
465static void *
467 struct GNUNET_SERVICE_Client *c,
468 struct GNUNET_MQ_Handle *mq)
469{
470 struct ClientState *cs;
471
472 num_clients++;
473 cs = GNUNET_new (struct ClientState);
474 cs->client = c;
475 cs->mq = mq;
476 return cs;
477}
478
479
488static int
490 const struct GNUNET_HashCode *key,
491 void *value)
492{
493 struct ElementEntry *ee = value;
494
496 GNUNET_free (ee);
497 return GNUNET_YES;
498}
499
500
508static void
510 struct GNUNET_SERVICE_Client *client,
511 void *internal_cls)
512{
513 struct ClientState *cs = internal_cls;
514 struct Operation *op;
515 struct Listener *listener;
516 struct Set *set;
517
518 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n");
519 if (NULL != (set = cs->set))
520 {
521 struct SetContent *content = set->content;
522 struct PendingMutation *pm;
523 struct PendingMutation *pm_current;
524 struct LazyCopyRequest *lcr;
525
526 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n");
527 /* Destroy pending set operations */
528 while (NULL != set->ops_head)
530
531 /* Destroy operation-specific state */
532 GNUNET_assert (NULL != set->state);
533 set->vt->destroy_set (set->state);
534 set->state = NULL;
535
536 /* Clean up ongoing iterations */
537 if (NULL != set->iter)
538 {
540 set->iter = NULL;
541 set->iteration_id++;
542 }
543
544 /* discard any pending mutations that reference this set */
545 pm = content->pending_mutations_head;
546 while (NULL != pm)
547 {
548 pm_current = pm;
549 pm = pm->next;
550 if (pm_current->set == set)
551 {
553 content->pending_mutations_tail,
554 pm_current);
555 GNUNET_free (pm_current);
556 }
557 }
558
559 /* free set content (or at least decrement RC) */
560 set->content = NULL;
561 GNUNET_assert (0 != content->refcount);
562 content->refcount--;
563 if (0 == content->refcount)
564 {
565 GNUNET_assert (NULL != content->elements);
568 NULL);
570 content->elements = NULL;
571 GNUNET_free (content);
572 }
574 set->excluded_generations = NULL;
575
576 /* remove set from pending copy requests */
577 lcr = lazy_copy_head;
578 while (NULL != lcr)
579 {
580 struct LazyCopyRequest *lcr_current = lcr;
581
582 lcr = lcr->next;
583 if (lcr_current->source_set == set)
584 {
587 lcr_current);
588 GNUNET_free (lcr_current);
589 }
590 }
591 GNUNET_free (set);
592 }
593
594 if (NULL != (listener = cs->listener))
595 {
596 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n");
598 listener->open_port = NULL;
599 while (NULL != (op = listener->op_head))
600 {
602 "Destroying incoming operation `%u' from peer `%s'\n",
603 (unsigned int) op->client_request_id,
604 GNUNET_i2s (&op->peer));
606 }
608 GNUNET_free (listener);
609 }
610 GNUNET_free (cs);
611 num_clients--;
612 if ((GNUNET_YES == in_shutdown) && (0 == num_clients))
613 {
614 if (NULL != cadet)
615 {
617 cadet = NULL;
618 }
619 }
620}
621
622
631static int
633{
634 struct Operation *op = cls;
635 struct Listener *listener = op->listener;
636 const struct GNUNET_MessageHeader *nested_context;
637
638 /* double operation request */
639 if (0 != op->suggest_id)
640 {
641 GNUNET_break_op (0);
642 return GNUNET_SYSERR;
643 }
644 /* This should be equivalent to the previous condition, but can't hurt to check twice */
645 if (NULL == op->listener)
646 {
647 GNUNET_break (0);
648 return GNUNET_SYSERR;
649 }
650 if (listener->operation !=
651 (enum GNUNET_SET_OperationType) ntohl (msg->operation))
652 {
653 GNUNET_break_op (0);
654 return GNUNET_SYSERR;
655 }
656 nested_context = GNUNET_MQ_extract_nested_mh (msg);
657 if ((NULL != nested_context) &&
658 (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE))
659 {
660 GNUNET_break_op (0);
661 return GNUNET_SYSERR;
662 }
663 return GNUNET_OK;
664}
665
666
684static void
686{
687 struct Operation *op = cls;
688 struct Listener *listener = op->listener;
689 const struct GNUNET_MessageHeader *nested_context;
690 struct GNUNET_MQ_Envelope *env;
691 struct GNUNET_SET_RequestMessage *cmsg;
692
693 nested_context = GNUNET_MQ_extract_nested_mh (msg);
694 /* Make a copy of the nested_context (application-specific context
695 information that is opaque to set) so we can pass it to the
696 listener later on */
697 if (NULL != nested_context)
698 op->context_msg = GNUNET_copy_message (nested_context);
699 op->remote_element_count = ntohl (msg->element_count);
700 GNUNET_log (
702 "Received P2P operation request (op %u, port %s) for active listener\n",
703 (uint32_t) ntohl (msg->operation),
704 GNUNET_h2s (&op->listener->app_id));
705 GNUNET_assert (0 == op->suggest_id);
706 if (0 == suggest_id)
707 suggest_id++;
708 op->suggest_id = suggest_id++;
709 GNUNET_assert (NULL != op->timeout_task);
710 GNUNET_SCHEDULER_cancel (op->timeout_task);
711 op->timeout_task = NULL;
714 op->context_msg);
715 GNUNET_log (
717 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
718 op->suggest_id,
719 listener,
720 listener->cs);
721 cmsg->accept_id = htonl (op->suggest_id);
722 cmsg->peer_id = op->peer;
723 GNUNET_MQ_send (listener->cs->mq, env);
724 /* NOTE: GNUNET_CADET_receive_done() will be called in
725 #handle_client_accept() */
726}
727
728
735static void
736execute_add (struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
737{
738 struct GNUNET_SET_Element el;
739 struct ElementEntry *ee;
740 struct GNUNET_HashCode hash;
741
743 el.size = ntohs (msg->header.size) - sizeof(*msg);
744 el.data = &msg[1];
745 el.element_type = ntohs (msg->element_type);
746 GNUNET_SET_element_hash (&el, &hash);
748 if (NULL == ee)
749 {
751 "Client inserts element %s of size %u\n",
752 GNUNET_h2s (&hash),
753 el.size);
754 ee = GNUNET_malloc (el.size + sizeof(*ee));
755 ee->element.size = el.size;
756 GNUNET_memcpy (&ee[1], el.data, el.size);
757 ee->element.data = &ee[1];
758 ee->element.element_type = el.element_type;
759 ee->remote = GNUNET_NO;
760 ee->mutations = NULL;
761 ee->mutations_size = 0;
762 ee->element_hash = hash;
765 set->content->elements,
766 &ee->element_hash,
767 ee,
769 }
770 else if (GNUNET_YES ==
775 {
777 "Client inserted element %s of size %u twice (ignored)\n",
778 GNUNET_h2s (&hash),
779 el.size);
780
781 /* same element inserted twice */
782 return;
783 }
784
785 {
786 struct MutationEvent mut = { .generation = set->current_generation,
787 .added = GNUNET_YES };
789 }
790 set->vt->add (set->state, ee);
791}
792
793
800static void
802{
803 struct GNUNET_SET_Element el;
804 struct ElementEntry *ee;
805 struct GNUNET_HashCode hash;
806
808 el.size = ntohs (msg->header.size) - sizeof(*msg);
809 el.data = &msg[1];
810 el.element_type = ntohs (msg->element_type);
811 GNUNET_SET_element_hash (&el, &hash);
813 if (NULL == ee)
814 {
815 /* Client tried to remove non-existing element. */
817 "Client removes non-existing element of size %u\n",
818 el.size);
819 return;
820 }
825 {
826 /* Client tried to remove element twice */
828 "Client removed element of size %u twice (ignored)\n",
829 el.size);
830 return;
831 }
832 else
833 {
834 struct MutationEvent mut = { .generation = set->current_generation,
835 .added = GNUNET_NO };
836
838 "Client removes element of size %u\n",
839 el.size);
840
842 }
843 set->vt->remove (set->state, ee);
844}
845
846
853static void
855{
856 switch (ntohs (msg->header.type))
857 {
859 execute_add (set, msg);
860 break;
861
863 execute_remove (set, msg);
864 break;
865
866 default:
867 GNUNET_break (0);
868 }
869}
870
871
878static void
880{
881 struct PendingMutation *pm;
882
883 if (0 != set->content->iterator_count)
884 return; /* still cannot do this */
885 while (NULL != (pm = set->content->pending_mutations_head))
886 {
889 pm);
891 "Executing pending mutation on %p.\n",
892 pm->set);
893 execute_mutation (pm->set, pm->msg);
894 GNUNET_free (pm->msg);
895 GNUNET_free (pm);
896 }
897}
898
899
913static void
915{
916 int ret;
917 struct ElementEntry *ee;
918 struct GNUNET_MQ_Envelope *ev;
920
921 GNUNET_assert (NULL != set->iter);
922 do
923 {
925 NULL,
926 (const void **) &ee);
927 if (GNUNET_NO == ret)
928 {
929 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iteration on %p done.\n", set);
932 set->iter = NULL;
933 set->iteration_id++;
935 set->content->iterator_count--;
937 GNUNET_MQ_send (set->cs->mq, ev);
938 return;
939 }
940 GNUNET_assert (NULL != ee);
941 }
942 while (GNUNET_NO ==
944 set->iter_generation,
948 "Sending iteration element on %p.\n",
949 set);
951 ee->element.size,
953 GNUNET_memcpy (&msg[1], ee->element.data, ee->element.size);
954 msg->element_type = htons (ee->element.element_type);
955 msg->iteration_id = htons (set->iteration_id);
956 GNUNET_MQ_send (set->cs->mq, ev);
957}
958
959
969static void
971{
972 struct ClientState *cs = cls;
973 struct Set *set;
974
975 if (NULL == (set = cs->set))
976 {
977 /* attempt to iterate over a non existing set */
978 GNUNET_break (0);
980 return;
981 }
982 if (NULL != set->iter)
983 {
984 /* Only one concurrent iterate-action allowed per set */
985 GNUNET_break (0);
987 return;
988 }
990 "Iterating set %p in gen %u with %u content elements\n",
991 (void *) set,
995 set->content->iterator_count++;
996 set->iter =
1000}
1001
1002
1011static void
1013{
1014 struct ClientState *cs = cls;
1015 struct Set *set;
1016
1018 "Client created new set (operation %u)\n",
1019 (uint32_t) ntohl (msg->operation));
1020 if (NULL != cs->set)
1021 {
1022 /* There can only be one set per client */
1023 GNUNET_break (0);
1025 return;
1026 }
1027 set = GNUNET_new (struct Set);
1028 switch (ntohl (msg->operation))
1029 {
1031 set->vt = _GSS_intersection_vt ();
1032 break;
1033
1035 set->vt = _GSS_union_vt ();
1036 break;
1037
1038 default:
1039 GNUNET_free (set);
1040 GNUNET_break (0);
1042 return;
1043 }
1044 set->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1045 set->state = set->vt->create ();
1046 if (NULL == set->state)
1047 {
1048 /* initialization failed (i.e. out of memory) */
1049 GNUNET_free (set);
1051 return;
1052 }
1053 set->content = GNUNET_new (struct SetContent);
1054 set->content->refcount = 1;
1056 set->cs = cs;
1057 cs->set = set;
1059}
1060
1061
1070static void
1072{
1073 struct Operation *op = cls;
1074
1075 op->timeout_task = NULL;
1077 "Remote peer's incoming request timed out\n");
1079}
1080
1081
1098static void *
1101 const struct GNUNET_PeerIdentity *source)
1102{
1103 struct Listener *listener = cls;
1104 struct Operation *op;
1105
1106 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "New incoming channel\n");
1107 op = GNUNET_new (struct Operation);
1108 op->listener = listener;
1109 op->peer = *source;
1110 op->channel = channel;
1111 op->mq = GNUNET_CADET_get_mq (op->channel);
1115 op);
1117 return op;
1118}
1119
1120
1137static void
1138channel_end_cb (void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
1139{
1140 struct Operation *op = channel_ctx;
1141
1142 op->channel = NULL;
1144}
1145
1146
1152void
1154{
1155 struct GNUNET_CADET_Channel *channel;
1156
1157 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "channel_end_cb called\n");
1158 if (NULL != (channel = op->channel))
1159 {
1160 /* This will free op; called conditionally as this helper function
1161 is also called from within the channel disconnect handler. */
1162 op->channel = NULL;
1164 }
1165 if (NULL != op->listener)
1166 {
1168 return;
1169 }
1170 if (NULL != op->set)
1171 op->set->vt->channel_death (op);
1172 else
1174 GNUNET_free (op);
1175}
1176
1177
1192static void
1194 const struct GNUNET_CADET_Channel *channel,
1195 int window_size)
1196{
1197 /* FIXME: not implemented, we could do flow control here... */
1198}
1199
1200
1207static void
1209{
1210 struct ClientState *cs = cls;
1211 struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1212 { GNUNET_MQ_hd_var_size (incoming_msg,
1215 NULL),
1216 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1218 struct IBFMessage,
1219 NULL),
1220 GNUNET_MQ_hd_var_size (union_p2p_elements,
1223 NULL),
1224 GNUNET_MQ_hd_var_size (union_p2p_offer,
1226 struct GNUNET_MessageHeader,
1227 NULL),
1228 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1230 struct InquiryMessage,
1231 NULL),
1232 GNUNET_MQ_hd_var_size (union_p2p_demand,
1234 struct GNUNET_MessageHeader,
1235 NULL),
1236 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1238 struct GNUNET_MessageHeader,
1239 NULL),
1240 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1242 struct GNUNET_MessageHeader,
1243 NULL),
1244 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1246 struct GNUNET_MessageHeader,
1247 NULL),
1248 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1250 struct GNUNET_MessageHeader,
1251 NULL),
1252 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1255 NULL),
1256 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1259 NULL),
1260 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1263 NULL),
1264 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1267 NULL),
1268 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1270 struct BFMessage,
1271 NULL),
1272 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1275 NULL),
1277 struct Listener *listener;
1278
1279 if (NULL != cs->listener)
1280 {
1281 /* max. one active listener per client! */
1282 GNUNET_break (0);
1284 return;
1285 }
1286 listener = GNUNET_new (struct Listener);
1287 listener->cs = cs;
1288 cs->listener = listener;
1289 listener->app_id = msg->app_id;
1290 listener->operation = (enum GNUNET_SET_OperationType) ntohl (msg->operation);
1293 "New listener created (op %u, port %s)\n",
1294 listener->operation,
1295 GNUNET_h2s (&listener->app_id));
1297 &msg->app_id,
1299 listener,
1302 cadet_handlers);
1304}
1305
1306
1314static void
1316{
1317 struct ClientState *cs = cls;
1318 struct Operation *op;
1319
1320 op = get_incoming (ntohl (msg->accept_reject_id));
1321 if (NULL == op)
1322 {
1323 /* no matching incoming operation for this reject;
1324 could be that the other peer already disconnected... */
1326 "Client rejected unknown operation %u\n",
1327 (unsigned int) ntohl (msg->accept_reject_id));
1329 return;
1330 }
1332 "Peer request (op %u, app %s) rejected by client\n",
1333 op->listener->operation,
1334 GNUNET_h2s (&cs->listener->app_id));
1337}
1338
1339
1346static int
1348{
1349 /* NOTE: Technically, we should probably check with the
1350 block library whether the element we are given is well-formed */
1351 return GNUNET_OK;
1352}
1353
1354
1361static void
1363{
1364 struct ClientState *cs = cls;
1365 struct Set *set;
1366
1367 if (NULL == (set = cs->set))
1368 {
1369 /* client without a set requested an operation */
1370 GNUNET_break (0);
1372 return;
1373 }
1375
1376 if (0 != set->content->iterator_count)
1377 {
1378 struct PendingMutation *pm;
1379
1380 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Scheduling mutation on set\n");
1381 pm = GNUNET_new (struct PendingMutation);
1382 pm->msg =
1384 pm->set = set;
1387 pm);
1388 return;
1389 }
1390 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
1391 execute_mutation (set, msg);
1392}
1393
1394
1401static void
1403{
1404 struct GenerationRange r;
1405
1407 {
1408 set->content->latest_generation++;
1409 set->current_generation++;
1410 return;
1411 }
1412
1414
1415 r.start = set->current_generation + 1;
1416 r.end = set->content->latest_generation + 1;
1417 set->content->latest_generation = r.end;
1418 set->current_generation = r.end;
1421 r);
1422}
1423
1424
1434static int
1436{
1437 /* FIXME: suboptimal, even if the context below could be NULL,
1438 there are malformed messages this does not check for... */
1439 return GNUNET_OK;
1440}
1441
1442
1451static void
1453{
1454 struct ClientState *cs = cls;
1455 struct Operation *op = GNUNET_new (struct Operation);
1456 const struct GNUNET_MQ_MessageHandler cadet_handlers[] =
1457 { GNUNET_MQ_hd_var_size (incoming_msg,
1460 op),
1461 GNUNET_MQ_hd_var_size (union_p2p_ibf,
1463 struct IBFMessage,
1464 op),
1465 GNUNET_MQ_hd_var_size (union_p2p_elements,
1468 op),
1469 GNUNET_MQ_hd_var_size (union_p2p_offer,
1471 struct GNUNET_MessageHeader,
1472 op),
1473 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
1475 struct InquiryMessage,
1476 op),
1477 GNUNET_MQ_hd_var_size (union_p2p_demand,
1479 struct GNUNET_MessageHeader,
1480 op),
1481 GNUNET_MQ_hd_fixed_size (union_p2p_done,
1483 struct GNUNET_MessageHeader,
1484 op),
1485 GNUNET_MQ_hd_fixed_size (union_p2p_over,
1487 struct GNUNET_MessageHeader,
1488 op),
1489 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
1491 struct GNUNET_MessageHeader,
1492 op),
1493 GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
1495 struct GNUNET_MessageHeader,
1496 op),
1497 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1500 op),
1501 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
1504 op),
1505 GNUNET_MQ_hd_var_size (union_p2p_full_element,
1508 op),
1509 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1512 op),
1513 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1515 struct BFMessage,
1516 op),
1517 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1520 op),
1522 struct Set *set;
1523 const struct GNUNET_MessageHeader *context;
1524
1525 if (NULL == (set = cs->set))
1526 {
1527 GNUNET_break (0);
1528 GNUNET_free (op);
1530 return;
1531 }
1533 op->peer = msg->target_peer;
1534 op->result_mode = ntohl (msg->result_mode);
1535 op->client_request_id = ntohl (msg->request_id);
1536 op->byzantine = msg->byzantine;
1537 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1538 op->force_full = msg->force_full;
1539 op->force_delta = msg->force_delta;
1541
1542 /* Advance generation values, so that
1543 mutations won't interfer with the running operation. */
1544 op->set = set;
1545 op->generation_created = set->current_generation;
1546 advance_generation (set);
1549 "Creating new CADET channel to port %s for set operation type %u\n",
1550 GNUNET_h2s (&msg->app_id),
1551 set->operation);
1553 op,
1554 &msg->target_peer,
1555 &msg->app_id,
1558 cadet_handlers);
1559 op->mq = GNUNET_CADET_get_mq (op->channel);
1560 op->state = set->vt->evaluate (op, context);
1561 if (NULL == op->state)
1562 {
1563 GNUNET_break (0);
1565 return;
1566 }
1568}
1569
1570
1579static void
1581{
1582 struct ClientState *cs = cls;
1583 struct Set *set;
1584
1585 if (NULL == (set = cs->set))
1586 {
1587 /* client without a set acknowledged receiving a value */
1588 GNUNET_break (0);
1590 return;
1591 }
1592 if (NULL == set->iter)
1593 {
1594 /* client sent an ack, but we were not expecting one (as
1595 set iteration has finished) */
1596 GNUNET_break (0);
1598 return;
1599 }
1601 if (ntohl (ack->send_more))
1602 {
1603 send_client_element (set);
1604 }
1605 else
1606 {
1608 set->iter = NULL;
1609 set->iteration_id++;
1610 }
1611}
1612
1613
1620static void
1622 const struct GNUNET_MessageHeader *mh)
1623{
1624 struct ClientState *cs = cls;
1625 struct Set *set;
1626 struct LazyCopyRequest *cr;
1627 struct GNUNET_MQ_Envelope *ev;
1628 struct GNUNET_SET_CopyLazyResponseMessage *resp_msg;
1629
1630 if (NULL == (set = cs->set))
1631 {
1632 /* client without a set requested an operation */
1633 GNUNET_break (0);
1635 return;
1636 }
1638 "Client requested creation of lazy copy\n");
1639 cr = GNUNET_new (struct LazyCopyRequest);
1640 cr->cookie = ++lazy_copy_cookie;
1641 cr->source_set = set;
1644 resp_msg->cookie = cr->cookie;
1645 GNUNET_MQ_send (set->cs->mq, ev);
1647}
1648
1649
1656static void
1658 void *cls,
1660{
1661 struct ClientState *cs = cls;
1662 struct LazyCopyRequest *cr;
1663 struct Set *set;
1664 int found;
1665
1666 if (NULL != cs->set)
1667 {
1668 /* There can only be one set per client */
1669 GNUNET_break (0);
1671 return;
1672 }
1673 found = GNUNET_NO;
1674 for (cr = lazy_copy_head; NULL != cr; cr = cr->next)
1675 {
1676 if (cr->cookie == msg->cookie)
1677 {
1678 found = GNUNET_YES;
1679 break;
1680 }
1681 }
1682 if (GNUNET_NO == found)
1683 {
1684 /* client asked for copy with cookie we don't know */
1685 GNUNET_break (0);
1687 return;
1688 }
1691 "Client %p requested use of lazy copy\n",
1692 cs);
1693 set = GNUNET_new (struct Set);
1694 switch (cr->source_set->operation)
1695 {
1697 set->vt = _GSS_intersection_vt ();
1698 break;
1699
1701 set->vt = _GSS_union_vt ();
1702 break;
1703
1704 default:
1705 GNUNET_assert (0);
1706 return;
1707 }
1708
1709 if (NULL == set->vt->copy_state)
1710 {
1711 /* Lazy copy not supported for this set operation */
1712 GNUNET_break (0);
1713 GNUNET_free (set);
1714 GNUNET_free (cr);
1716 return;
1717 }
1718
1719 set->operation = cr->source_set->operation;
1720 set->state = set->vt->copy_state (cr->source_set->state);
1721 set->content = cr->source_set->content;
1722 set->content->refcount++;
1723
1729 * sizeof(struct GenerationRange));
1730
1731 /* Advance the generation of the new set, so that mutations to the
1732 of the cloned set and the source set are independent. */
1733 advance_generation (set);
1734 set->cs = cs;
1735 cs->set = set;
1736 GNUNET_free (cr);
1738}
1739
1740
1747static void
1749{
1750 struct ClientState *cs = cls;
1751 struct Set *set;
1752 struct Operation *op;
1753 int found;
1754
1755 if (NULL == (set = cs->set))
1756 {
1757 /* client without a set requested an operation */
1758 GNUNET_break (0);
1760 return;
1761 }
1762 found = GNUNET_NO;
1763 for (op = set->ops_head; NULL != op; op = op->next)
1764 {
1765 if (op->client_request_id == ntohl (msg->request_id))
1766 {
1767 found = GNUNET_YES;
1768 break;
1769 }
1770 }
1771 if (GNUNET_NO == found)
1772 {
1773 /* It may happen that the operation was already destroyed due to
1774 * the other peer disconnecting. The client may not know about this
1775 * yet and try to cancel the (just barely non-existent) operation.
1776 * So this is not a hard error.
1778 "Client canceled non-existent op %u\n",
1779 (uint32_t) ntohl (msg->request_id));
1780 }
1781 else
1782 {
1784 "Client requested cancel for op %u\n",
1785 (uint32_t) ntohl (msg->request_id));
1787 }
1789}
1790
1791
1800static void
1802{
1803 struct ClientState *cs = cls;
1804 struct Set *set;
1805 struct Operation *op;
1806 struct GNUNET_SET_ResultMessage *result_message;
1807 struct GNUNET_MQ_Envelope *ev;
1808 struct Listener *listener;
1809
1810 if (NULL == (set = cs->set))
1811 {
1812 /* client without a set requested to accept */
1813 GNUNET_break (0);
1815 return;
1816 }
1817 op = get_incoming (ntohl (msg->accept_reject_id));
1818 if (NULL == op)
1819 {
1820 /* It is not an error if the set op does not exist -- it may
1821 * have been destroyed when the partner peer disconnected. */
1822 GNUNET_log (
1824 "Client %p accepted request %u of listener %p that is no longer active\n",
1825 cs,
1826 ntohl (msg->accept_reject_id),
1827 cs->listener);
1828 ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SET_RESULT);
1829 result_message->request_id = msg->request_id;
1830 result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
1831 GNUNET_MQ_send (set->cs->mq, ev);
1833 return;
1834 }
1836 "Client accepting request %u\n",
1837 (uint32_t) ntohl (msg->accept_reject_id));
1838 listener = op->listener;
1839 op->listener = NULL;
1840 GNUNET_CONTAINER_DLL_remove (listener->op_head, listener->op_tail, op);
1841 op->set = set;
1843 op->client_request_id = ntohl (msg->request_id);
1844 op->result_mode = ntohl (msg->result_mode);
1845 op->byzantine = msg->byzantine;
1846 op->byzantine_lower_bound = msg->byzantine_lower_bound;
1847 op->force_full = msg->force_full;
1848 op->force_delta = msg->force_delta;
1849
1850 /* Advance generation values, so that future mutations do not
1851 interfer with the running operation. */
1852 op->generation_created = set->current_generation;
1853 advance_generation (set);
1854 GNUNET_assert (NULL == op->state);
1855 op->state = set->vt->accept (op);
1856 if (NULL == op->state)
1857 {
1858 GNUNET_break (0);
1860 return;
1861 }
1862 /* Now allow CADET to continue, as we did not do this in
1863 #handle_incoming_msg (as we wanted to first see if the
1864 local client would accept the request). */
1865 GNUNET_CADET_receive_done (op->channel);
1867}
1868
1869
1875static void
1876shutdown_task (void *cls)
1877{
1878 /* Delay actual shutdown to allow service to disconnect clients */
1880 if (0 == num_clients)
1881 {
1882 if (NULL != cadet)
1883 {
1885 cadet = NULL;
1886 }
1887 }
1889 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
1890}
1891
1892
1901static void
1902run (void *cls,
1903 const struct GNUNET_CONFIGURATION_Handle *cfg,
1905{
1906 /* FIXME: need to modify SERVICE (!) API to allow
1907 us to run a shutdown task *after* clients were
1908 forcefully disconnected! */
1912 if (NULL == cadet)
1913 {
1915 _ ("Could not connect to CADET service\n"));
1917 return;
1918 }
1919}
1920
1921
1926 "set",
1928 &run,
1931 NULL,
1932 GNUNET_MQ_hd_fixed_size (client_accept,
1935 NULL),
1936 GNUNET_MQ_hd_fixed_size (client_iter_ack,
1939 NULL),
1940 GNUNET_MQ_hd_var_size (client_mutation,
1943 NULL),
1944 GNUNET_MQ_hd_fixed_size (client_create_set,
1947 NULL),
1948 GNUNET_MQ_hd_fixed_size (client_iterate,
1950 struct GNUNET_MessageHeader,
1951 NULL),
1952 GNUNET_MQ_hd_var_size (client_evaluate,
1955 NULL),
1956 GNUNET_MQ_hd_fixed_size (client_listen,
1959 NULL),
1960 GNUNET_MQ_hd_fixed_size (client_reject,
1963 NULL),
1964 GNUNET_MQ_hd_var_size (client_mutation,
1967 NULL),
1968 GNUNET_MQ_hd_fixed_size (client_cancel,
1971 NULL),
1972 GNUNET_MQ_hd_fixed_size (client_copy_lazy_prepare,
1974 struct GNUNET_MessageHeader,
1975 NULL),
1976 GNUNET_MQ_hd_fixed_size (client_copy_lazy_connect,
1979 NULL),
1981
1982
1983/* end of gnunet-service-set.c */
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
static struct GNUNET_ARM_Operation * op
Current operation.
Definition: gnunet-arm.c:144
static struct GNUNET_ARM_MonitorHandle * m
Monitor connection with ARM.
Definition: gnunet-arm.c:104
static int start
Set if we are to start default services (including ARM).
Definition: gnunet-arm.c:39
static int ret
Final status code.
Definition: gnunet-arm.c:94
static struct GNUNET_CONFIGURATION_Handle * cfg
Our configuration.
Definition: gnunet-arm.c:109
static int end
Set if we are to shutdown all services (including ARM).
Definition: gnunet-arm.c:34
static struct GNUNET_CADET_Handle * mh
Cadet handle.
Definition: gnunet-cadet.c:92
struct GNUNET_HashCode key
The key used in the DHT.
static GstElement * source
Appsrc instance into which we write data for the pipeline.
static pa_context * context
Pulseaudio context.
static struct GNUNET_IDENTITY_EgoLookup * el
Handle to identity lookup.
static char * value
Value of the record to add/remove.
static struct GNUNET_SERVICE_Handle * service
Handle to our service instance.
static int garbage_collect_cb(void *cls, const struct GNUNET_HashCode *key, void *value)
Function invoked to check if an element can be removed from the set's history because it is no longer...
void _GSS_operation_destroy2(struct Operation *op)
This function probably should not exist and be replaced by inlining more specific logic in the variou...
int _GSS_is_element_of_operation(struct ElementEntry *ee, struct Operation *op)
Is element ee part of the set used by op?
static uint32_t suggest_id
Counter for allocating unique IDs for clients, used to identify incoming operation requests from remo...
static void execute_delayed_mutations(struct Set *set)
Execute mutations that were delayed on a set because of pending operations.
static struct Listener * listener_head
Listeners are held in a doubly linked list.
static int check_client_mutation(void *cls, const struct GNUNET_SET_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
static int check_client_evaluate(void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
static void collect_generation_garbage(struct Set *set)
Collect and destroy elements that are not needed anymore, because their lifetime (as determined by th...
static struct GNUNET_CADET_Handle * cadet
Handle to the cadet service, used to listen for and connect to remote peers.
static void handle_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Handle a request for a set operation from another peer.
static int in_shutdown
Are we in shutdown? if GNUNET_YES and the number of clients drops to zero, disconnect from CADET.
static int destroy_elements_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator over hash map entries to free element entries.
static void shutdown_task(void *cls)
Called to clean up, after a shutdown has been requested.
static void handle_client_iter_ack(void *cls, const struct GNUNET_SET_IterAckMessage *ack)
Handle an ack from a client, and send the next element.
static void handle_client_mutation(void *cls, const struct GNUNET_SET_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
struct GNUNET_STATISTICS_Handle * _GSS_statistics
Statistics handle.
static void send_client_element(struct Set *set)
Send the next element of a set to the set's client.
static unsigned int num_clients
Number of active clients.
static void execute_mutation(struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
Perform a mutation on a set as specified by the msg.
static void channel_window_cb(void *cls, const struct GNUNET_CADET_Channel *channel, int window_size)
Function called whenever an MQ-channel's transmission window size changes.
static void handle_client_iterate(void *cls, const struct GNUNET_MessageHeader *m)
Called when a client wants to iterate the elements of a set.
static void advance_generation(struct Set *set)
Advance the current generation of a set, adding exclusion ranges if necessary.
static void execute_add(struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
Add an element to set as specified by msg.
static void handle_client_copy_lazy_prepare(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a request from the client to copy a set.
static int check_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Check a request for a set operation from another peer.
static int is_element_of_generation(struct ElementEntry *ee, unsigned int query_generation, struct GenerationRange *excluded, unsigned int excluded_size)
Is element ee part of the set during query_generation?
static int is_excluded_generation(unsigned int generation, struct GenerationRange *excluded, unsigned int excluded_size)
Is generation in the range of exclusions?
static uint32_t lazy_copy_cookie
Generator for unique cookie we set per lazy copy request.
static void incoming_destroy(struct Operation *op)
Destroy an incoming request from a remote peer.
static struct Listener * listener_tail
Listeners are held in a doubly linked list.
static void execute_remove(struct Set *set, const struct GNUNET_SET_ElementMessage *msg)
Remove an element from set as specified by msg.
static struct LazyCopyRequest * lazy_copy_head
DLL of lazy copy requests by this client.
static void handle_client_copy_lazy_connect(void *cls, const struct GNUNET_SET_CopyLazyConnectMessage *msg)
Handle a request from the client to connect to a copy of a set.
static void handle_client_accept(void *cls, const struct GNUNET_SET_AcceptMessage *msg)
Handle a request from the client to accept a set operation that came from a remote peer.
static struct LazyCopyRequest * lazy_copy_tail
DLL of lazy copy requests by this client.
static void handle_client_reject(void *cls, const struct GNUNET_SET_RejectMessage *msg)
Called when the listening client rejects an operation request by another peer.
static void handle_client_evaluate(void *cls, const struct GNUNET_SET_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
static void * channel_new_cb(void *cls, struct GNUNET_CADET_Channel *channel, const struct GNUNET_PeerIdentity *source)
Method called whenever another peer has added us to a channel the other peer initiated.
static struct Operation * get_incoming(uint32_t id)
Get the incoming socket associated with the given id.
static void * client_connect_cb(void *cls, struct GNUNET_SERVICE_Client *c, struct GNUNET_MQ_Handle *mq)
Callback called when a client connects to the service.
#define INCOMING_CHANNEL_TIMEOUT
How long do we hold on to an incoming channel if there is no local listener before giving up?
static void handle_client_cancel(void *cls, const struct GNUNET_SET_CancelMessage *msg)
Handle a request from the client to cancel a running set operation.
GNUNET_SERVICE_MAIN("set", GNUNET_SERVICE_OPTION_NONE, &run, &client_connect_cb, &client_disconnect_cb, NULL, GNUNET_MQ_hd_fixed_size(client_accept, GNUNET_MESSAGE_TYPE_SET_ACCEPT, struct GNUNET_SET_AcceptMessage, NULL), GNUNET_MQ_hd_fixed_size(client_iter_ack, GNUNET_MESSAGE_TYPE_SET_ITER_ACK, struct GNUNET_SET_IterAckMessage, NULL), GNUNET_MQ_hd_var_size(client_mutation, GNUNET_MESSAGE_TYPE_SET_ADD, struct GNUNET_SET_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size(client_create_set, GNUNET_MESSAGE_TYPE_SET_CREATE, struct GNUNET_SET_CreateMessage, NULL), GNUNET_MQ_hd_fixed_size(client_iterate, GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST, struct GNUNET_MessageHeader, NULL), GNUNET_MQ_hd_var_size(client_evaluate, GNUNET_MESSAGE_TYPE_SET_EVALUATE, struct GNUNET_SET_EvaluateMessage, NULL), GNUNET_MQ_hd_fixed_size(client_listen, GNUNET_MESSAGE_TYPE_SET_LISTEN, struct GNUNET_SET_ListenMessage, NULL), GNUNET_MQ_hd_fixed_size(client_reject, GNUNET_MESSAGE_TYPE_SET_REJECT, struct GNUNET_SET_RejectMessage, NULL), GNUNET_MQ_hd_var_size(client_mutation, GNUNET_MESSAGE_TYPE_SET_REMOVE, struct GNUNET_SET_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size(client_cancel, GNUNET_MESSAGE_TYPE_SET_CANCEL, struct GNUNET_SET_CancelMessage, NULL), GNUNET_MQ_hd_fixed_size(client_copy_lazy_prepare, GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE, struct GNUNET_MessageHeader, NULL), GNUNET_MQ_hd_fixed_size(client_copy_lazy_connect, GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT, struct GNUNET_SET_CopyLazyConnectMessage, NULL), GNUNET_MQ_handler_end())
Define "main" method using service macro.
static void client_disconnect_cb(void *cls, struct GNUNET_SERVICE_Client *client, void *internal_cls)
Clean up after a client has disconnected.
void _GSS_operation_destroy(struct Operation *op, int gc)
Destroy the given operation.
static void channel_end_cb(void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
Function called whenever a channel is destroyed.
static void incoming_timeout_cb(void *cls)
Timeout happens iff:
static void handle_client_listen(void *cls, const struct GNUNET_SET_ListenMessage *msg)
Called when a client wants to create a new listener.
static void handle_client_create_set(void *cls, const struct GNUNET_SET_CreateMessage *msg)
Called when a client wants to create a new set.
static void run(void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_SERVICE_Handle *service)
Function called by the service's run method to run service-specific setup code.
common components for the implementation the different set operations
const struct SetVT * _GSS_intersection_vt(void)
Get the table with implementing functions for set intersection.
const struct SetVT * _GSS_union_vt(void)
Get the table with implementing functions for set union.
two-peer set operations
Peer-to-Peer messages for gnunet set.
two-peer set operations
static struct GNUNET_TRANSPORT_PluginMonitor * pm
Handle if we are monitoring plugin session activity.
API to create, modify and access statistics.
struct GNUNET_CADET_Handle * GNUNET_CADET_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the MQ-based cadet service.
Definition: cadet_api.c:894
void GNUNET_CADET_receive_done(struct GNUNET_CADET_Channel *channel)
Indicate readiness to receive the next message on a channel.
Definition: cadet_api.c:872
void GNUNET_CADET_channel_destroy(struct GNUNET_CADET_Channel *channel)
Destroy an existing channel.
Definition: cadet_api.c:830
struct GNUNET_MQ_Handle * GNUNET_CADET_get_mq(const struct GNUNET_CADET_Channel *channel)
Obtain the message queue for a connected channel.
Definition: cadet_api.c:1066
struct GNUNET_CADET_Port * GNUNET_CADET_open_port(struct GNUNET_CADET_Handle *h, const struct GNUNET_HashCode *port, GNUNET_CADET_ConnectEventHandler connects, void *connects_cls, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Open a port to receive incoming MQ-based channels.
Definition: cadet_api.c:954
void GNUNET_CADET_disconnect(struct GNUNET_CADET_Handle *handle)
Disconnect from the cadet service.
Definition: cadet_api.c:774
void GNUNET_CADET_close_port(struct GNUNET_CADET_Port *p)
Close a port opened with GNUNET_CADET_open_port.
Definition: cadet_api.c:801
struct GNUNET_CADET_Channel * GNUNET_CADET_channel_create(struct GNUNET_CADET_Handle *h, void *channel_cls, const struct GNUNET_PeerIdentity *destination, const struct GNUNET_HashCode *port, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Create a new channel towards a remote peer.
Definition: cadet_api.c:1015
uint32_t GNUNET_CRYPTO_random_u32(enum GNUNET_CRYPTO_Quality mode, uint32_t i)
Produce a random value.
@ GNUNET_CRYPTO_QUALITY_NONCE
Randomness for IVs etc.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_CONTAINER_DLL_insert_tail(head, tail, element)
Insert an element at the tail of a DLL.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MultiHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_iterator_next(struct GNUNET_CONTAINER_MultiHashMapIterator *iter, struct GNUNET_HashCode *key, const void **value)
Retrieve the next element from the hash map at the iterator's position.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
unsigned int GNUNET_CONTAINER_multihashmap_size(const struct GNUNET_CONTAINER_MultiHashMap *map)
Get the number of key-value pairs in the map.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
void GNUNET_CONTAINER_multihashmap_iterator_destroy(struct GNUNET_CONTAINER_MultiHashMapIterator *iter)
Destroy a multihashmap iterator.
struct GNUNET_CONTAINER_MultiHashMapIterator * GNUNET_CONTAINER_multihashmap_iterator_create(const struct GNUNET_CONTAINER_MultiHashMap *map)
Create an iterator for a multihashmap.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY
There must only be one value per key; storing a value should fail if a value under the same key alrea...
#define GNUNET_log(kind,...)
#define GNUNET_MAX(a, b)
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
#define GNUNET_MIN(a, b)
@ GNUNET_OK
@ GNUNET_YES
@ GNUNET_NO
@ GNUNET_SYSERR
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur.
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_DEBUG
@ GNUNET_ERROR_TYPE_INFO
struct GNUNET_MessageHeader * GNUNET_copy_message(const struct GNUNET_MessageHeader *msg)
Create a copy of the given message.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_array_append(arr, len, element)
Append an element to an array (growing the array by one).
#define GNUNET_free(ptr)
Wrapper around free.
#define GNUNET_memdup(buf, size)
Allocate and initialize a block of memory.
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:304
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct.
Definition: gnunet_mq_lib.h:63
#define GNUNET_MQ_msg_nested_mh(mvar, type, mh)
Allocate a GNUNET_MQ_Envelope, and append a payload message after the given message struct.
#define GNUNET_MQ_msg_header(type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
Definition: gnunet_mq_lib.h:87
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:78
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
#define GNUNET_MQ_extract_nested_mh(var)
Return a pointer to the message at the end of the given message.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER
Tell the other peer which hashes match a given IBF key.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY
Tell the other peer to send us a list of hashes that match an IBF key.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE
Strata estimator.
#define GNUNET_MESSAGE_TYPE_SET_EVALUATE
Evaluate a set operation.
#define GNUNET_MESSAGE_TYPE_SET_CANCEL
Cancel a set operation.
#define GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_RESPONSE
Give the client an ID for connecting to the set's copy.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF
Invertible bloom filter.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE
Set operation is done.
#define GNUNET_MESSAGE_TYPE_SET_ADD
Add element to set.
#define GNUNET_MESSAGE_TYPE_SET_REQUEST
Notify the client of a request from a remote peer.
#define GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_PREPARE
Ask the set service to prepare a copy of a set.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC
Compressed strata estimator.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT
Send a set element, not as response to a demand but because we're sending the full set.
#define GNUNET_MESSAGE_TYPE_SET_REJECT
Reject a set request.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS
Actual set elements.
#define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
Request a set operation from a remote peer.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SET_LISTEN
Listen for operation requests.
#define GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE
Intersection operation is done.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OVER
Request all missing elements from the other peer, based on their sets and the elements we previously ...
#define GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT
Element result for the iterating client.
#define GNUNET_MESSAGE_TYPE_SET_ACCEPT
Accept a set request.
#define GNUNET_MESSAGE_TYPE_SET_RESULT
Create an empty set.
#define GNUNET_MESSAGE_TYPE_SET_REMOVE
Remove element from set.
#define GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST
Start iteration over set elements.
#define GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO
Information about the element count for intersection.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE
Request all missing elements from the other peer, based on their sets and the elements we previously ...
#define GNUNET_MESSAGE_TYPE_SET_ITER_ACK
Acknowledge result from iteration.
#define GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT
Sent by the client to the server to connect to an existing, lazily copied set.
#define GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF
Bloom filter message for intersection exchange started by Bob.
#define GNUNET_MESSAGE_TYPE_SET_ITER_DONE
Iteration end marker for the client.
#define GNUNET_MESSAGE_TYPE_SET_CREATE
Create a new local set.
void GNUNET_SCHEDULER_shutdown(void)
Request the shutdown of a scheduler.
Definition: scheduler.c:567
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run on shutdown, that is when a CTRL-C signal is received,...
Definition: scheduler.c:1340
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:981
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1278
void GNUNET_SERVICE_client_drop(struct GNUNET_SERVICE_Client *c)
Ask the server to disconnect from the given client.
Definition: service.c:2489
void GNUNET_SERVICE_client_continue(struct GNUNET_SERVICE_Client *c)
Continue receiving further messages from the given client.
Definition: service.c:2408
@ GNUNET_SERVICE_OPTION_NONE
Use defaults.
#define GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE
Maximum size of a context message for set operation requests.
GNUNET_SET_OperationType
The operation that a set set supports.
void GNUNET_SET_element_hash(const struct GNUNET_SET_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition: set_api.c:1184
@ GNUNET_SET_STATUS_FAILURE
The other peer refused to to the operation with us, or something went wrong.
@ GNUNET_SET_OPERATION_INTERSECTION
Set intersection, only return elements that are in both sets.
@ GNUNET_SET_OPERATION_UNION
Set union, return all elements that are in at least one of the sets.
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
#define _(String)
GNU gettext support macro.
Definition: platform.h:178
Bloom filter messages exchanged for set intersection calculation.
State we keep per client.
struct GNUNET_MQ_Handle * mq
MQ to talk to client.
struct Listener * listener
Listener, if associated with the client, otherwise NULL.
struct Set * set
Set, if associated with the client, otherwise NULL.
struct GNUNET_SERVICE_Client * client
Client this is about.
Information about an element element in the set.
int remote
GNUNET_YES if the element is a remote element, and does not belong to the operation's set.
struct GNUNET_SET_Element element
The actual element.
struct MutationEvent * mutations
If mutations is not NULL, it contains a list of mutations, ordered by increasing generation.
unsigned int mutations_size
Number of elements in the array mutations.
struct GNUNET_HashCode element_hash
Hash of the element.
struct GNUNET_ARM_Operation * next
This is a doubly-linked list.
Definition: arm_api.c:45
Opaque handle to a channel.
Definition: cadet.h:116
Opaque handle to the service.
Definition: cadet_api.c:39
Opaque handle to a port.
Definition: cadet_api.c:80
Internal representation of the hash map.
A 512-bit hashcode.
Handle to a message queue.
Definition: mq.c:87
Message handler for a specific message type.
Header for all communications.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
The identity of the host (wraps the signing key of the peer).
Handle to a client that is connected to a service.
Definition: service.c:252
Handle to a service.
Definition: service.c:118
Message sent by a listening client to the service to accept performing the operation with the other p...
Definition: set.h:83
Sent to the service by the client in order to cancel a set operation.
Definition: set.h:307
Client connects to a lazily copied set.
Definition: set.h:385
Server responds to a lazy copy request.
Definition: set.h:368
uint32_t cookie
Temporary name for the copied set.
Definition: set.h:377
Message sent by the client to the service to ask starting a new set to perform operations with.
Definition: set.h:41
Message sent by client to the service to add or remove an element to/from the set.
Definition: set.h:281
Element stored in a set.
uint16_t size
Number of bytes in the buffer pointed to by data.
const void * data
Actual data of the element.
uint16_t element_type
Application-specific element type.
Message sent by client to service to initiate a set operation as a client (not as listener).
Definition: set.h:181
Client acknowledges receiving element in iteration.
Definition: set.h:351
uint32_t send_more
Non-zero if the service should continue sending elements.
Definition: set.h:360
Set element transmitted by service to client in response to a set iteration request.
Definition: set.h:325
Message sent by the client to the service to start listening for incoming requests to perform a certa...
Definition: set.h:60
Message sent by a listening client to the service to reject performing the operation with the other p...
Definition: set.h:136
A request for an operation with another client.
Definition: set.h:153
struct GNUNET_PeerIdentity peer_id
Identity of the requesting peer.
Definition: set.h:168
uint32_t accept_id
ID of the to identify the request when accepting or rejecting it.
Definition: set.h:163
Message sent by the service to the client to indicate an element that is removed (set intersection) o...
Definition: set.h:245
uint32_t request_id
id the result belongs to
Definition: set.h:259
uint16_t result_status
Was the evaluation successful? Contains an enum GNUNET_SET_Status in NBO.
Definition: set.h:265
Handle for the service.
Context for the garbage_collect_cb().
unsigned int max_op_generation
Largest generation for which an operation is still pending.
unsigned int min_op_generation
Lowest generation for which an operation is still pending.
struct GNUNET_CONTAINER_MultiHashMap * map
Map for which we are garbage collecting removed elements.
unsigned int start
First generation that is excluded.
unsigned int end
Generation after the last excluded generation.
Message containing buckets of an invertible bloom filter.
estimate_best_mode_of_operation (uint64_t avg_element_size, uint64_t local_set_size,...
Last message, send to confirm the final set.
During intersection, the first (and possibly second) message send it the number of elements in the se...
Lazy copy requests made by a client.
struct LazyCopyRequest * next
Kept in a DLL.
struct LazyCopyRequest * prev
Kept in a DLL.
struct Set * source_set
Which set are we supposed to copy?
uint32_t cookie
Cookie identifying the request.
A listener is inhabited by a client, and waits for evaluation requests from remote peers.
struct Listener * next
Listeners are held in a doubly linked list.
struct ClientState * cs
Client that owns the listener.
enum GNUNET_SET_OperationType operation
The type of the operation.
struct GNUNET_HashCode app_id
Application ID for the operation, used to distinguish multiple operations of the same type with the s...
struct GNUNET_CADET_Port * open_port
The port we are listening on with CADET.
struct Listener * prev
Listeners are held in a doubly linked list.
struct Operation * op_tail
Tail of DLL of operations this listener is responsible for.
struct Operation * op_head
Head of DLL of operations this listener is responsible for.
MutationEvent gives information about changes to an element (removal / addition) in a set content.
unsigned int generation
First generation affected by this mutation event.
int added
If added is GNUNET_YES, then this is a remove event, otherwise it is an add event.
Operation context used to execute a set operation.
struct GNUNET_CADET_Channel * channel
Channel to the peer.
struct Listener * listener
Port this operation runs on.
struct Set * set
Set associated with the operation, NULL until the spec has been associated with a set.
Information about a mutation to apply to a set.
struct Set * set
Set this mutation is about.
SetContent stores the actual set elements, which may be shared by multiple generations derived from o...
int iterator_count
Number of concurrently active iterators.
unsigned int latest_generation
FIXME: document!
struct GNUNET_CONTAINER_MultiHashMap * elements
Maps struct GNUNET_HashCode * to struct ElementEntry *.
struct PendingMutation * pending_mutations_head
Mutations requested by the client that we're unable to execute right now because we're iterating over...
unsigned int refcount
Number of references to the content.
struct PendingMutation * pending_mutations_tail
Mutations requested by the client that we're unable to execute right now because we're iterating over...
SetCreateImpl create
Callback for the set creation.
SetAddRemoveImpl add
Callback for element insertion.
OpEvaluateImpl evaluate
Callback for starting evaluation with a remote peer.
OpCancelImpl cancel
Callback for canceling an operation.
SetAddRemoveImpl remove
Callback for element removal.
SetCopyStateImpl copy_state
Callback for making a copy of a set's internal state.
OpAcceptImpl accept
Callback for accepting a set operation request.
SetDestroyImpl destroy_set
Callback for destruction of the set state.
A set that supports a specific operation with other peers.
unsigned int iter_generation
Generation we're currently iteration over.
struct Operation * ops_head
Evaluate operations are held in a linked list.
struct Operation * ops_tail
Evaluate operations are held in a linked list.
struct SetState * state
Implementation-specific state.
enum GNUNET_SET_OperationType operation
Type of operation supported for this set.
uint16_t iteration_id
Each iter is assigned a unique number, so that the client can distinguish iterations.
struct GenerationRange * excluded_generations
List of generations we have to exclude, due to lazy copies.
struct SetContent * content
Content, possibly shared by multiple sets, and thus reference counted.
const struct SetVT * vt
Virtual table for this set.
struct GNUNET_CONTAINER_MultiHashMapIterator * iter
Current state of iterating elements for the client.
struct ClientState * cs
Client that owns the set.
unsigned int excluded_generations_size
Number of elements in array excluded_generations.
unsigned int current_generation
Current generation, that is, number of previously executed operations and lazy copies on the underlyi...
Strata estimator together with the peer's overall set size.