GNUnet 0.21.1
gnunet-service-set_union.c
Go to the documentation of this file.
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
26#include "platform.h"
27#include "gnunet_util_lib.h"
29#include "gnunet-service-set.h"
30#include "ibf.h"
34#include <gcrypt.h>
35
36
37#define LOG(kind, ...) GNUNET_log_from (kind, "set-union", __VA_ARGS__)
38
39
43#define SE_STRATA_COUNT 32
44
48#define SE_IBF_SIZE 80
49
53#define SE_IBF_HASH_NUM 4
54
58#define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
59
65#define MAX_IBF_ORDER (20)
66
71#define IBF_ALPHA 4
72
73
78{
83
94
99
104
109
115
123
130
136};
137
138
142struct OperationState
143{
149
154
159
166
171
176
181
186
190 uint32_t salt_send;
191
195 uint32_t salt_receive;
196
202
207
212 uint64_t initial_size;
213};
214
215
220{
225
233
241};
242
243
249{
255
260 struct Operation *op;
261};
262
263
267struct SetState
268{
276};
277
278
289static int
291 uint32_t key,
292 void *value)
293{
294 struct KeyEntry *k = value;
295
296 GNUNET_assert (NULL != k);
297 if (GNUNET_YES == k->element->remote)
298 {
299 GNUNET_free (k->element);
300 k->element = NULL;
301 }
302 GNUNET_free (k);
303 return GNUNET_YES;
304}
305
306
313static void
315{
317 "destroying union op\n");
318 /* check if the op was canceled twice */
319 GNUNET_assert (NULL != op->state);
320 if (NULL != op->state->remote_ibf)
321 {
322 ibf_destroy (op->state->remote_ibf);
323 op->state->remote_ibf = NULL;
324 }
325 if (NULL != op->state->demanded_hashes)
326 {
327 GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
328 op->state->demanded_hashes = NULL;
329 }
330 if (NULL != op->state->local_ibf)
331 {
332 ibf_destroy (op->state->local_ibf);
333 op->state->local_ibf = NULL;
334 }
335 if (NULL != op->state->se)
336 {
337 strata_estimator_destroy (op->state->se);
338 op->state->se = NULL;
339 }
340 if (NULL != op->state->key_to_element)
341 {
342 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
344 NULL);
345 GNUNET_CONTAINER_multihashmap32_destroy (op->state->key_to_element);
346 op->state->key_to_element = NULL;
347 }
348 GNUNET_free (op->state);
349 op->state = NULL;
351 "destroying union op done\n");
352}
353
354
361static void
363{
364 struct GNUNET_MQ_Envelope *ev;
366
368 "union operation failed\n");
370 msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
371 msg->request_id = htonl (op->client_request_id);
372 msg->element_type = htons (0);
373 GNUNET_MQ_send (op->set->cs->mq,
374 ev);
376}
377
378
386static struct IBF_Key
387get_ibf_key (const struct GNUNET_HashCode *src)
388{
389 struct IBF_Key key;
390 uint16_t salt = 0;
391
393 GNUNET_CRYPTO_kdf (&key, sizeof(key),
394 src, sizeof *src,
395 &salt, sizeof(salt),
396 NULL, 0));
397 return key;
398}
399
400
405{
410
414 struct KeyEntry *k;
415};
416
417
428static int
430 uint32_t key,
431 void *value)
432{
433 struct GetElementContext *ctx = cls;
434 struct KeyEntry *k = value;
435
436 GNUNET_assert (NULL != k);
438 &ctx->hash))
439 {
440 ctx->k = k;
441 return GNUNET_NO;
442 }
443 return GNUNET_YES;
444}
445
446
455static struct KeyEntry *
457 const struct GNUNET_HashCode *element_hash)
458{
459 int ret;
460 struct IBF_Key ibf_key;
461 struct GetElementContext ctx = { { { 0 } }, 0 };
462
463 ctx.hash = *element_hash;
464
465 ibf_key = get_ibf_key (element_hash);
466 ret = GNUNET_CONTAINER_multihashmap32_get_multiple (op->state->key_to_element,
467 (uint32_t) ibf_key.key_val,
469 &ctx);
470
471 /* was the iteration aborted because we found the element? */
472 if (GNUNET_SYSERR == ret)
473 {
474 GNUNET_assert (NULL != ctx.k);
475 return ctx.k;
476 }
477 return NULL;
478}
479
480
495static void
497 struct ElementEntry *ee,
498 int received)
499{
500 struct IBF_Key ibf_key;
501 struct KeyEntry *k;
502
504 k = GNUNET_new (struct KeyEntry);
505 k->element = ee;
506 k->ibf_key = ibf_key;
507 k->received = received;
509 GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
510 (uint32_t) ibf_key.key_val,
511 k,
513}
514
515
519static void
520salt_key (const struct IBF_Key *k_in,
521 uint32_t salt,
522 struct IBF_Key *k_out)
523{
524 int s = salt % 64;
525 uint64_t x = k_in->key_val;
526
527 /* rotate ibf key */
528 x = (x >> s) | (x << (64 - s));
529 k_out->key_val = x;
530}
531
532
536static void
537unsalt_key (const struct IBF_Key *k_in,
538 uint32_t salt,
539 struct IBF_Key *k_out)
540{
541 int s = salt % 64;
542 uint64_t x = k_in->key_val;
543
544 x = (x << s) | (x >> (64 - s));
545 k_out->key_val = x;
546}
547
548
556static int
558 uint32_t key,
559 void *value)
560{
561 struct Operation *op = cls;
562 struct KeyEntry *ke = value;
563 struct IBF_Key salted_key;
564
566 "[OP %p] inserting %lx (hash %s) into ibf\n",
567 op,
568 (unsigned long) ke->ibf_key.key_val,
570 salt_key (&ke->ibf_key,
571 op->state->salt_send,
572 &salted_key);
573 ibf_insert (op->state->local_ibf, salted_key);
574 return GNUNET_YES;
575}
576
577
588static int
590 const struct GNUNET_HashCode *key,
591 void *value)
592{
593 struct Operation *op = cls;
594 struct ElementEntry *ee = value;
595
596 /* make sure that the element belongs to the set at the time
597 * of creating the operation */
598 if (GNUNET_NO ==
600 op))
601 return GNUNET_YES;
604 ee,
605 GNUNET_NO);
606 return GNUNET_YES;
607}
608
609
616static void
618{
619 unsigned int len;
620
621 GNUNET_assert (NULL == op->state->key_to_element);
622 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
623 op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
624 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
626 op);
627}
628
629
638static int
640 uint32_t size)
641{
642 GNUNET_assert (NULL != op->state->key_to_element);
643
644 if (NULL != op->state->local_ibf)
645 ibf_destroy (op->state->local_ibf);
646 op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
647 if (NULL == op->state->local_ibf)
648 {
650 "Failed to allocate local IBF\n");
651 return GNUNET_SYSERR;
652 }
653 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
655 op);
656 return GNUNET_OK;
657}
658
659
669static int
671 uint16_t ibf_order)
672{
673 unsigned int buckets_sent = 0;
674 struct InvertibleBloomFilter *ibf;
675
676 if (GNUNET_OK !=
677 prepare_ibf (op, 1 << ibf_order))
678 {
679 /* allocation failed */
680 return GNUNET_SYSERR;
681 }
682
684 "sending ibf of size %u\n",
685 1 << ibf_order);
686
687 {
688 char name[64] = { 0 };
689 snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order);
691 }
692
693 ibf = op->state->local_ibf;
694
695 while (buckets_sent < (1 << ibf_order))
696 {
697 unsigned int buckets_in_message;
698 struct GNUNET_MQ_Envelope *ev;
699 struct IBFMessage *msg;
700
701 buckets_in_message = (1 << ibf_order) - buckets_sent;
702 /* limit to maximum */
703 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
704 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
705
707 buckets_in_message * IBF_BUCKET_SIZE,
709 msg->reserved1 = 0;
710 msg->reserved2 = 0;
711 msg->order = ibf_order;
712 msg->offset = htonl (buckets_sent);
713 msg->salt = htonl (op->state->salt_send);
714 ibf_write_slice (ibf, buckets_sent,
715 buckets_in_message, &msg[1]);
716 buckets_sent += buckets_in_message;
718 "ibf chunk size %u, %u/%u sent\n",
719 buckets_in_message,
720 buckets_sent,
721 1 << ibf_order);
722 GNUNET_MQ_send (op->mq, ev);
723 }
724
725 /* The other peer must decode the IBF, so
726 * we're passive. */
727 op->state->phase = PHASE_INVENTORY_PASSIVE;
728 return GNUNET_OK;
729}
730
731
739static unsigned int
740get_order_from_difference (unsigned int diff)
741{
742 unsigned int ibf_order;
743
744 ibf_order = 2;
745 while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
746 ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
747 (ibf_order < MAX_IBF_ORDER))
748 ibf_order++;
749 // add one for correction
750 return ibf_order + 1;
751}
752
753
763static int
765 const struct GNUNET_HashCode *key,
766 void *value)
767{
768 struct Operation *op = cls;
769 struct GNUNET_SET_ElementMessage *emsg;
770 struct ElementEntry *ee = value;
771 struct GNUNET_SET_Element *el = &ee->element;
772 struct GNUNET_MQ_Envelope *ev;
773
775 "Sending element %s\n",
776 GNUNET_h2s (key));
777 ev = GNUNET_MQ_msg_extra (emsg,
778 el->size,
780 emsg->element_type = htons (el->element_type);
781 GNUNET_memcpy (&emsg[1],
782 el->data,
783 el->size);
784 GNUNET_MQ_send (op->mq,
785 ev);
786 return GNUNET_YES;
787}
788
789
795static void
797{
798 struct GNUNET_MQ_Envelope *ev;
799
800 op->state->phase = PHASE_FULL_SENDING;
802 "Dedicing to transmit the full set\n");
803 /* FIXME: use a more memory-friendly way of doing this with an
804 iterator, just as we do in the non-full case! */
805 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
807 op);
809 GNUNET_MQ_send (op->mq,
810 ev);
811}
812
813
820int
822 const struct StrataEstimatorMessage *msg)
823{
824 struct Operation *op = cls;
825 int is_compressed;
826 size_t len;
827
828 if (op->state->phase != PHASE_EXPECT_SE)
829 {
830 GNUNET_break (0);
831 return GNUNET_SYSERR;
832 }
833 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (
834 msg->header.type));
835 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
836 if ((GNUNET_NO == is_compressed) &&
838 {
839 GNUNET_break (0);
840 return GNUNET_SYSERR;
841 }
842 return GNUNET_OK;
843}
844
845
852void
854 const struct StrataEstimatorMessage *msg)
855{
856 struct Operation *op = cls;
857 struct StrataEstimator *remote_se;
858 unsigned int diff;
859 uint64_t other_size;
860 size_t len;
861 int is_compressed;
862
863 is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons (
864 msg->header.type));
866 "# bytes of SE received",
867 ntohs (msg->header.size),
868 GNUNET_NO);
869 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
870 other_size = GNUNET_ntohll (msg->set_size);
874 if (NULL == remote_se)
875 {
876 /* insufficient resources, fail */
878 return;
879 }
880 if (GNUNET_OK !=
882 len,
883 is_compressed,
884 remote_se))
885 {
886 /* decompression failed */
887 strata_estimator_destroy (remote_se);
889 return;
890 }
891 GNUNET_assert (NULL != op->state->se);
892 diff = strata_estimator_difference (remote_se,
893 op->state->se);
894
895 if (diff > 200)
896 diff = diff * 3 / 2;
897
898 strata_estimator_destroy (remote_se);
899 strata_estimator_destroy (op->state->se);
900 op->state->se = NULL;
902 "got se diff=%d, using ibf size %d\n",
903 diff,
904 1U << get_order_from_difference (diff));
905
906 {
907 char *set_debug;
908
909 set_debug = getenv ("GNUNET_SET_BENCHMARK");
910 if ((NULL != set_debug) &&
911 (0 == strcmp (set_debug, "1")))
912 {
913 FILE *f = fopen ("set.log", "a");
914 fprintf (f, "%llu\n", (unsigned long long) diff);
915 fclose (f);
916 }
917 }
918
919 if ((GNUNET_YES == op->byzantine) &&
920 (other_size < op->byzantine_lower_bound))
921 {
922 GNUNET_break (0);
924 return;
925 }
926
927 if ((GNUNET_YES == op->force_full) ||
928 (diff > op->state->initial_size / 4) ||
929 (0 == other_size))
930 {
932 "Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
933 diff,
934 (unsigned long long) op->state->initial_size);
936 "# of full sends",
937 1,
938 GNUNET_NO);
939 if ((op->state->initial_size <= other_size) ||
940 (0 == other_size))
941 {
943 }
944 else
945 {
946 struct GNUNET_MQ_Envelope *ev;
947
949 "Telling other peer that we expect its full set\n");
950 op->state->phase = PHASE_EXPECT_IBF;
953 GNUNET_MQ_send (op->mq,
954 ev);
955 }
956 }
957 else
958 {
960 "# of ibf sends",
961 1,
962 GNUNET_NO);
963 if (GNUNET_OK !=
964 send_ibf (op,
966 {
967 /* Internal error, best we can do is shut the connection */
969 "Failed to send IBF, closing connection\n");
971 return;
972 }
973 }
974 GNUNET_CADET_receive_done (op->channel);
975}
976
977
985static int
987 uint32_t key,
988 void *value)
989{
990 struct SendElementClosure *sec = cls;
991 struct Operation *op = sec->op;
992 struct KeyEntry *ke = value;
993 struct GNUNET_MQ_Envelope *ev;
994 struct GNUNET_MessageHeader *mh;
995
996 /* Detect 32-bit key collision for the 64-bit IBF keys. */
997 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
998 return GNUNET_YES;
999
1001 sizeof(struct GNUNET_HashCode),
1003
1004 GNUNET_assert (NULL != ev);
1005 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1007 "[OP %p] sending element offer (%s) to peer\n",
1008 op,
1010 GNUNET_MQ_send (op->mq, ev);
1011 return GNUNET_YES;
1012}
1013
1014
1021static void
1023 struct IBF_Key ibf_key)
1024{
1025 struct SendElementClosure send_cls;
1026
1027 send_cls.ibf_key = ibf_key;
1028 send_cls.op = op;
1030 op->state->key_to_element,
1031 (uint32_t) ibf_key.
1032 key_val,
1034 &send_cls);
1035}
1036
1037
1045static int
1047{
1048 struct IBF_Key key;
1049 struct IBF_Key last_key;
1050 int side;
1051 unsigned int num_decoded;
1052 struct InvertibleBloomFilter *diff_ibf;
1053
1054 GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
1055
1056 if (GNUNET_OK !=
1057 prepare_ibf (op,
1058 op->state->remote_ibf->size))
1059 {
1060 GNUNET_break (0);
1061 /* allocation failed */
1062 return GNUNET_SYSERR;
1063 }
1064 diff_ibf = ibf_dup (op->state->local_ibf);
1065 ibf_subtract (diff_ibf,
1066 op->state->remote_ibf);
1067
1068 ibf_destroy (op->state->remote_ibf);
1069 op->state->remote_ibf = NULL;
1070
1072 "decoding IBF (size=%u)\n",
1073 diff_ibf->size);
1074
1075 num_decoded = 0;
1076 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1077
1078 while (1)
1079 {
1080 int res;
1081 int cycle_detected = GNUNET_NO;
1082
1083 last_key = key;
1084
1085 res = ibf_decode (diff_ibf, &side, &key);
1086 if (res == GNUNET_OK)
1087 {
1089 "decoded ibf key %lx\n",
1090 (unsigned long) key.key_val);
1091 num_decoded += 1;
1092 if ((num_decoded > diff_ibf->size) ||
1093 ((num_decoded > 1) &&
1094 (last_key.key_val == key.key_val)))
1095 {
1097 "detected cyclic ibf (decoded %u/%u)\n",
1098 num_decoded,
1099 diff_ibf->size);
1100 cycle_detected = GNUNET_YES;
1101 }
1102 }
1103 if ((GNUNET_SYSERR == res) ||
1104 (GNUNET_YES == cycle_detected))
1105 {
1106 int next_order;
1107 next_order = 0;
1108 while (1 << next_order < diff_ibf->size)
1109 next_order++;
1110 next_order++;
1111 if (next_order <= MAX_IBF_ORDER)
1112 {
1114 "decoding failed, sending larger ibf (size %u)\n",
1115 1 << next_order);
1117 "# of IBF retries",
1118 1,
1119 GNUNET_NO);
1120 op->state->salt_send++;
1121 if (GNUNET_OK !=
1122 send_ibf (op, next_order))
1123 {
1124 /* Internal error, best we can do is shut the connection */
1126 "Failed to send IBF, closing connection\n");
1128 ibf_destroy (diff_ibf);
1129 return GNUNET_SYSERR;
1130 }
1131 }
1132 else
1133 {
1135 "# of failed union operations (too large)",
1136 1,
1137 GNUNET_NO);
1138 // XXX: Send the whole set, element-by-element
1140 "set union failed: reached ibf limit\n");
1142 ibf_destroy (diff_ibf);
1143 return GNUNET_SYSERR;
1144 }
1145 break;
1146 }
1147 if (GNUNET_NO == res)
1148 {
1149 struct GNUNET_MQ_Envelope *ev;
1150
1152 "transmitted all values, sending DONE\n");
1154 GNUNET_MQ_send (op->mq, ev);
1155 /* We now wait until we get a DONE message back
1156 * and then wait for our MQ to be flushed and all our
1157 * demands be delivered. */
1158 break;
1159 }
1160 if (1 == side)
1161 {
1162 struct IBF_Key unsalted_key;
1163
1164 unsalt_key (&key,
1165 op->state->salt_receive,
1166 &unsalted_key);
1168 unsalted_key);
1169 }
1170 else if (-1 == side)
1171 {
1172 struct GNUNET_MQ_Envelope *ev;
1173 struct InquiryMessage *msg;
1174
1175 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1176 * the effort additional complexity. */
1178 sizeof(struct IBF_Key),
1180 msg->salt = htonl (op->state->salt_receive);
1181 GNUNET_memcpy (&msg[1],
1182 &key,
1183 sizeof(struct IBF_Key));
1185 "sending element inquiry for IBF key %lx\n",
1186 (unsigned long) key.key_val);
1187 GNUNET_MQ_send (op->mq, ev);
1188 }
1189 else
1190 {
1191 GNUNET_assert (0);
1192 }
1193 }
1194 ibf_destroy (diff_ibf);
1195 return GNUNET_OK;
1196}
1197
1198
1209int
1211 const struct IBFMessage *msg)
1212{
1213 struct Operation *op = cls;
1214 unsigned int buckets_in_message;
1215
1216 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1217 {
1218 GNUNET_break_op (0);
1219 return GNUNET_SYSERR;
1220 }
1221 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1223 if (0 == buckets_in_message)
1224 {
1225 GNUNET_break_op (0);
1226 return GNUNET_SYSERR;
1227 }
1228 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
1230 {
1231 GNUNET_break_op (0);
1232 return GNUNET_SYSERR;
1233 }
1234 if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1235 {
1236 if (ntohl (msg->offset) != op->state->ibf_buckets_received)
1237 {
1238 GNUNET_break_op (0);
1239 return GNUNET_SYSERR;
1240 }
1241 if (1 << msg->order != op->state->remote_ibf->size)
1242 {
1243 GNUNET_break_op (0);
1244 return GNUNET_SYSERR;
1245 }
1246 if (ntohl (msg->salt) != op->state->salt_receive)
1247 {
1248 GNUNET_break_op (0);
1249 return GNUNET_SYSERR;
1250 }
1251 }
1252 else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1253 (op->state->phase != PHASE_EXPECT_IBF))
1254 {
1255 GNUNET_break_op (0);
1256 return GNUNET_SYSERR;
1257 }
1258
1259 return GNUNET_OK;
1260}
1261
1262
1272void
1274 const struct IBFMessage *msg)
1275{
1276 struct Operation *op = cls;
1277 unsigned int buckets_in_message;
1278
1279 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1281 if ((op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1282 (op->state->phase == PHASE_EXPECT_IBF))
1283 {
1284 op->state->phase = PHASE_EXPECT_IBF_CONT;
1285 GNUNET_assert (NULL == op->state->remote_ibf);
1287 "Creating new ibf of size %u\n",
1288 1 << msg->order);
1289 op->state->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
1290 op->state->salt_receive = ntohl (msg->salt);
1292 "Receiving new IBF with salt %u\n",
1293 op->state->salt_receive);
1294 if (NULL == op->state->remote_ibf)
1295 {
1297 "Failed to parse remote IBF, closing connection\n");
1299 return;
1300 }
1301 op->state->ibf_buckets_received = 0;
1302 if (0 != ntohl (msg->offset))
1303 {
1304 GNUNET_break_op (0);
1306 return;
1307 }
1308 }
1309 else
1310 {
1311 GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
1313 "Received more of IBF\n");
1314 }
1315 GNUNET_assert (NULL != op->state->remote_ibf);
1316
1317 ibf_read_slice (&msg[1],
1318 op->state->ibf_buckets_received,
1319 buckets_in_message,
1320 op->state->remote_ibf);
1321 op->state->ibf_buckets_received += buckets_in_message;
1322
1323 if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1324 {
1326 "received full ibf\n");
1327 op->state->phase = PHASE_INVENTORY_ACTIVE;
1328 if (GNUNET_OK !=
1330 {
1331 /* Internal error, best we can do is shut down */
1333 "Failed to decode IBF, closing connection\n");
1335 return;
1336 }
1337 }
1338 GNUNET_CADET_receive_done (op->channel);
1339}
1340
1341
1350static void
1352 struct GNUNET_SET_Element *element,
1353 int status)
1354{
1355 struct GNUNET_MQ_Envelope *ev;
1356 struct GNUNET_SET_ResultMessage *rm;
1357
1359 "sending element (size %u) to client\n",
1360 element->size);
1361 GNUNET_assert (0 != op->client_request_id);
1363 if (NULL == ev)
1364 {
1365 GNUNET_MQ_discard (ev);
1366 GNUNET_break (0);
1367 return;
1368 }
1369 rm->result_status = htons (status);
1370 rm->request_id = htonl (op->client_request_id);
1371 rm->element_type = htons (element->element_type);
1373 op->state->key_to_element));
1374 GNUNET_memcpy (&rm[1],
1375 element->data,
1376 element->size);
1377 GNUNET_MQ_send (op->set->cs->mq,
1378 ev);
1379}
1380
1381
1388static void
1390{
1391 struct Operation *op = cls;
1392 struct GNUNET_MQ_Envelope *ev;
1393 struct GNUNET_SET_ResultMessage *rm;
1394
1395 if (GNUNET_YES == op->state->client_done_sent)
1396 {
1397 return;
1398 }
1399
1400 if (PHASE_DONE != op->state->phase)
1401 {
1403 "Union operation failed\n");
1405 "# Union operations failed",
1406 1,
1407 GNUNET_NO);
1410 rm->request_id = htonl (op->client_request_id);
1411 rm->element_type = htons (0);
1412 GNUNET_MQ_send (op->set->cs->mq,
1413 ev);
1414 return;
1415 }
1416
1417 op->state->client_done_sent = GNUNET_YES;
1418
1420 "# Union operations succeeded",
1421 1,
1422 GNUNET_NO);
1424 "Signalling client that union operation is done\n");
1425 ev = GNUNET_MQ_msg (rm,
1427 rm->request_id = htonl (op->client_request_id);
1429 rm->element_type = htons (0);
1431 op->state->key_to_element));
1432 GNUNET_MQ_send (op->set->cs->mq,
1433 ev);
1434}
1435
1436
1442static void
1444{
1445 unsigned int num_demanded;
1446
1447 num_demanded = GNUNET_CONTAINER_multihashmap_size (
1448 op->state->demanded_hashes);
1449
1450 if (PHASE_FINISH_WAITING == op->state->phase)
1451 {
1453 "In PHASE_FINISH_WAITING, pending %u demands\n",
1454 num_demanded);
1455 if (0 == num_demanded)
1456 {
1457 struct GNUNET_MQ_Envelope *ev;
1458
1459 op->state->phase = PHASE_DONE;
1461 GNUNET_MQ_send (op->mq,
1462 ev);
1463 /* We now wait until the other peer sends P2P_OVER
1464 * after it got all elements from us. */
1465 }
1466 }
1467 if (PHASE_FINISH_CLOSING == op->state->phase)
1468 {
1470 "In PHASE_FINISH_CLOSING, pending %u demands\n",
1471 num_demanded);
1472 if (0 == num_demanded)
1473 {
1474 op->state->phase = PHASE_DONE;
1477 }
1478 }
1479}
1480
1481
1488int
1490 const struct GNUNET_SET_ElementMessage *emsg)
1491{
1492 struct Operation *op = cls;
1493
1494 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1495 {
1496 GNUNET_break_op (0);
1497 return GNUNET_SYSERR;
1498 }
1499 if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
1500 {
1501 GNUNET_break_op (0);
1502 return GNUNET_SYSERR;
1503 }
1504 return GNUNET_OK;
1505}
1506
1507
1516void
1518 const struct GNUNET_SET_ElementMessage *emsg)
1519{
1520 struct Operation *op = cls;
1521 struct ElementEntry *ee;
1522 struct KeyEntry *ke;
1523 uint16_t element_size;
1524
1525 element_size = ntohs (emsg->header.size) - sizeof(struct
1527 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
1528 GNUNET_memcpy (&ee[1],
1529 &emsg[1],
1530 element_size);
1531 ee->element.size = element_size;
1532 ee->element.data = &ee[1];
1533 ee->element.element_type = ntohs (emsg->element_type);
1534 ee->remote = GNUNET_YES;
1536 &ee->element_hash);
1537 if (GNUNET_NO ==
1538 GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
1539 &ee->element_hash,
1540 NULL))
1541 {
1542 /* We got something we didn't demand, since it's not in our map. */
1543 GNUNET_break_op (0);
1545 return;
1546 }
1547
1549 "Got element (size %u, hash %s) from peer\n",
1550 (unsigned int) element_size,
1551 GNUNET_h2s (&ee->element_hash));
1552
1554 "# received elements",
1555 1,
1556 GNUNET_NO);
1558 "# exchanged elements",
1559 1,
1560 GNUNET_NO);
1561
1562 op->state->received_total++;
1563
1564 ke = op_get_element (op, &ee->element_hash);
1565 if (NULL != ke)
1566 {
1567 /* Got repeated element. Should not happen since
1568 * we track demands. */
1570 "# repeated elements",
1571 1,
1572 GNUNET_NO);
1573 ke->received = GNUNET_YES;
1574 GNUNET_free (ee);
1575 }
1576 else
1577 {
1579 "Registering new element from remote peer\n");
1580 op->state->received_fresh++;
1582 /* only send results immediately if the client wants it */
1583 switch (op->result_mode)
1584 {
1587 break;
1588
1591 break;
1592
1593 default:
1594 /* Result mode not supported, should have been caught earlier. */
1595 GNUNET_break (0);
1596 break;
1597 }
1598 }
1599
1600 if ((op->state->received_total > 8) &&
1601 (op->state->received_fresh < op->state->received_total / 3))
1602 {
1603 /* The other peer gave us lots of old elements, there's something wrong. */
1604 GNUNET_break_op (0);
1606 return;
1607 }
1608 GNUNET_CADET_receive_done (op->channel);
1609 maybe_finish (op);
1610}
1611
1612
1619int
1621 const struct GNUNET_SET_ElementMessage *emsg)
1622{
1623 struct Operation *op = cls;
1624
1625 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1626 {
1627 GNUNET_break_op (0);
1628 return GNUNET_SYSERR;
1629 }
1630 // FIXME: check that we expect full elements here?
1631 return GNUNET_OK;
1632}
1633
1634
1641void
1643 const struct GNUNET_SET_ElementMessage *emsg)
1644{
1645 struct Operation *op = cls;
1646 struct ElementEntry *ee;
1647 struct KeyEntry *ke;
1648 uint16_t element_size;
1649
1650 element_size = ntohs (emsg->header.size) - sizeof(struct
1652 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
1653 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
1654 ee->element.size = element_size;
1655 ee->element.data = &ee[1];
1656 ee->element.element_type = ntohs (emsg->element_type);
1657 ee->remote = GNUNET_YES;
1659
1661 "Got element (full diff, size %u, hash %s) from peer\n",
1662 (unsigned int) element_size,
1663 GNUNET_h2s (&ee->element_hash));
1664
1666 "# received elements",
1667 1,
1668 GNUNET_NO);
1670 "# exchanged elements",
1671 1,
1672 GNUNET_NO);
1673
1674 op->state->received_total++;
1675
1676 ke = op_get_element (op, &ee->element_hash);
1677 if (NULL != ke)
1678 {
1679 /* Got repeated element. Should not happen since
1680 * we track demands. */
1682 "# repeated elements",
1683 1,
1684 GNUNET_NO);
1685 ke->received = GNUNET_YES;
1686 GNUNET_free (ee);
1687 }
1688 else
1689 {
1691 "Registering new element from remote peer\n");
1692 op->state->received_fresh++;
1694 /* only send results immediately if the client wants it */
1695 switch (op->result_mode)
1696 {
1699 break;
1700
1703 break;
1704
1705 default:
1706 /* Result mode not supported, should have been caught earlier. */
1707 GNUNET_break (0);
1708 break;
1709 }
1710 }
1711
1712 if ((GNUNET_YES == op->byzantine) &&
1713 (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1714 (op->state->received_fresh < op->state->received_total / 6))
1715 {
1716 /* The other peer gave us lots of old elements, there's something wrong. */
1718 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1719 (unsigned long long) op->state->received_fresh,
1720 (unsigned long long) op->state->received_total);
1721 GNUNET_break_op (0);
1723 return;
1724 }
1725 GNUNET_CADET_receive_done (op->channel);
1726}
1727
1728
1736int
1738 const struct InquiryMessage *msg)
1739{
1740 struct Operation *op = cls;
1741 unsigned int num_keys;
1742
1743 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1744 {
1745 GNUNET_break_op (0);
1746 return GNUNET_SYSERR;
1747 }
1748 if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1749 {
1750 GNUNET_break_op (0);
1751 return GNUNET_SYSERR;
1752 }
1753 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1754 / sizeof(struct IBF_Key);
1755 if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1756 != num_keys * sizeof(struct IBF_Key))
1757 {
1758 GNUNET_break_op (0);
1759 return GNUNET_SYSERR;
1760 }
1761 return GNUNET_OK;
1762}
1763
1764
1772void
1774 const struct InquiryMessage *msg)
1775{
1776 struct Operation *op = cls;
1777 const struct IBF_Key *ibf_key;
1778 unsigned int num_keys;
1779
1781 "Received union inquiry\n");
1782 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
1783 / sizeof(struct IBF_Key);
1784 ibf_key = (const struct IBF_Key *) &msg[1];
1785 while (0 != num_keys--)
1786 {
1787 struct IBF_Key unsalted_key;
1788
1789 unsalt_key (ibf_key,
1790 ntohl (msg->salt),
1791 &unsalted_key);
1793 unsalted_key);
1794 ibf_key++;
1795 }
1796 GNUNET_CADET_receive_done (op->channel);
1797}
1798
1799
1810static int
1812 uint32_t key,
1813 void *value)
1814{
1815 struct Operation *op = cls;
1816 struct KeyEntry *ke = value;
1817 struct GNUNET_MQ_Envelope *ev;
1818 struct GNUNET_SET_ElementMessage *emsg;
1819 struct ElementEntry *ee = ke->element;
1820
1821 if (GNUNET_YES == ke->received)
1822 return GNUNET_YES;
1823 ev = GNUNET_MQ_msg_extra (emsg,
1824 ee->element.size,
1826 GNUNET_memcpy (&emsg[1],
1827 ee->element.data,
1828 ee->element.size);
1829 emsg->element_type = htons (ee->element.element_type);
1830 GNUNET_MQ_send (op->mq,
1831 ev);
1832 return GNUNET_YES;
1833}
1834
1835
1842void
1844 const struct GNUNET_MessageHeader *mh)
1845{
1846 struct Operation *op = cls;
1847
1849 "Received request for full set transmission\n");
1850 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1851 {
1852 GNUNET_break_op (0);
1854 return;
1855 }
1856 if (PHASE_EXPECT_IBF != op->state->phase)
1857 {
1858 GNUNET_break_op (0);
1860 return;
1861 }
1862
1863 // FIXME: we need to check that our set is larger than the
1864 // byzantine_lower_bound by some threshold
1865 send_full_set (op);
1866 GNUNET_CADET_receive_done (op->channel);
1867}
1868
1869
1876void
1878 const struct GNUNET_MessageHeader *mh)
1879{
1880 struct Operation *op = cls;
1881
1882 switch (op->state->phase)
1883 {
1884 case PHASE_EXPECT_IBF:
1885 {
1886 struct GNUNET_MQ_Envelope *ev;
1887
1889 "got FULL DONE, sending elements that other peer is missing\n");
1890
1891 /* send all the elements that did not come from the remote peer */
1892 GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
1894 op);
1895
1897 GNUNET_MQ_send (op->mq,
1898 ev);
1899 op->state->phase = PHASE_DONE;
1900 /* we now wait until the other peer sends us the OVER message*/
1901 }
1902 break;
1903
1904 case PHASE_FULL_SENDING:
1905 {
1907 "got FULL DONE, finishing\n");
1908 /* We sent the full set, and got the response for that. We're done. */
1909 op->state->phase = PHASE_DONE;
1910 GNUNET_CADET_receive_done (op->channel);
1913 return;
1914 }
1915 break;
1916
1917 default:
1919 "Handle full done phase is %u\n",
1920 (unsigned) op->state->phase);
1921 GNUNET_break_op (0);
1923 return;
1924 }
1925 GNUNET_CADET_receive_done (op->channel);
1926}
1927
1928
1937int
1939 const struct GNUNET_MessageHeader *mh)
1940{
1941 struct Operation *op = cls;
1942 unsigned int num_hashes;
1943
1944 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
1945 {
1946 GNUNET_break_op (0);
1947 return GNUNET_SYSERR;
1948 }
1949 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
1950 / sizeof(struct GNUNET_HashCode);
1951 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
1952 != num_hashes * sizeof(struct GNUNET_HashCode))
1953 {
1954 GNUNET_break_op (0);
1955 return GNUNET_SYSERR;
1956 }
1957 return GNUNET_OK;
1958}
1959
1960
1968void
1970 const struct GNUNET_MessageHeader *mh)
1971{
1972 struct Operation *op = cls;
1973 struct ElementEntry *ee;
1974 struct GNUNET_SET_ElementMessage *emsg;
1975 const struct GNUNET_HashCode *hash;
1976 unsigned int num_hashes;
1977 struct GNUNET_MQ_Envelope *ev;
1978
1979 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
1980 / sizeof(struct GNUNET_HashCode);
1981 for (hash = (const struct GNUNET_HashCode *) &mh[1];
1982 num_hashes > 0;
1983 hash++, num_hashes--)
1984 {
1985 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
1986 hash);
1987 if (NULL == ee)
1988 {
1989 /* Demand for non-existing element. */
1990 GNUNET_break_op (0);
1992 return;
1993 }
1995 {
1996 /* Probably confused lazily copied sets. */
1997 GNUNET_break_op (0);
1999 return;
2000 }
2001 ev = GNUNET_MQ_msg_extra (emsg, ee->element.size,
2003 GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
2004 emsg->reserved = htons (0);
2005 emsg->element_type = htons (ee->element.element_type);
2007 "[OP %p] Sending demanded element (size %u, hash %s) to peer\n",
2008 op,
2009 (unsigned int) ee->element.size,
2010 GNUNET_h2s (&ee->element_hash));
2011 GNUNET_MQ_send (op->mq, ev);
2013 "# exchanged elements",
2014 1,
2015 GNUNET_NO);
2016
2017 switch (op->result_mode)
2018 {
2020 /* Nothing to do. */
2021 break;
2022
2025 break;
2026
2027 default:
2028 /* Result mode not supported, should have been caught earlier. */
2029 GNUNET_break (0);
2030 break;
2031 }
2032 }
2033 GNUNET_CADET_receive_done (op->channel);
2034}
2035
2036
2044int
2046 const struct GNUNET_MessageHeader *mh)
2047{
2048 struct Operation *op = cls;
2049 unsigned int num_hashes;
2050
2051 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2052 {
2053 GNUNET_break_op (0);
2054 return GNUNET_SYSERR;
2055 }
2056 /* look up elements and send them */
2057 if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2058 (op->state->phase != PHASE_INVENTORY_ACTIVE))
2059 {
2060 GNUNET_break_op (0);
2061 return GNUNET_SYSERR;
2062 }
2063 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2064 / sizeof(struct GNUNET_HashCode);
2065 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
2066 num_hashes * sizeof(struct GNUNET_HashCode))
2067 {
2068 GNUNET_break_op (0);
2069 return GNUNET_SYSERR;
2070 }
2071 return GNUNET_OK;
2072}
2073
2074
2082void
2084 const struct GNUNET_MessageHeader *mh)
2085{
2086 struct Operation *op = cls;
2087 const struct GNUNET_HashCode *hash;
2088 unsigned int num_hashes;
2089
2090 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2091 / sizeof(struct GNUNET_HashCode);
2092 for (hash = (const struct GNUNET_HashCode *) &mh[1];
2093 num_hashes > 0;
2094 hash++, num_hashes--)
2095 {
2096 struct ElementEntry *ee;
2097 struct GNUNET_MessageHeader *demands;
2098 struct GNUNET_MQ_Envelope *ev;
2099
2100 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
2101 hash);
2102 if (NULL != ee)
2104 continue;
2105
2106 if (GNUNET_YES ==
2107 GNUNET_CONTAINER_multihashmap_contains (op->state->demanded_hashes,
2108 hash))
2109 {
2111 "Skipped sending duplicate demand\n");
2112 continue;
2113 }
2114
2117 op->state->demanded_hashes,
2118 hash,
2119 NULL,
2121
2123 "[OP %p] Requesting element (hash %s)\n",
2124 op, GNUNET_h2s (hash));
2125 ev = GNUNET_MQ_msg_header_extra (demands,
2126 sizeof(struct GNUNET_HashCode),
2128 GNUNET_memcpy (&demands[1],
2129 hash,
2130 sizeof(struct GNUNET_HashCode));
2131 GNUNET_MQ_send (op->mq, ev);
2132 }
2133 GNUNET_CADET_receive_done (op->channel);
2134}
2135
2136
2143void
2145 const struct GNUNET_MessageHeader *mh)
2146{
2147 struct Operation *op = cls;
2148
2149 if (GNUNET_SET_OPERATION_UNION != op->set->operation)
2150 {
2151 GNUNET_break_op (0);
2153 return;
2154 }
2155 switch (op->state->phase)
2156 {
2158 /* We got all requests, but still have to send our elements in response. */
2159 op->state->phase = PHASE_FINISH_WAITING;
2160
2162 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2163 /* The active peer is done sending offers
2164 * and inquiries. This means that all
2165 * our responses to that (demands and offers)
2166 * must be in flight (queued or in mesh).
2167 *
2168 * We should notify the active peer once
2169 * all our demands are satisfied, so that the active
2170 * peer can quit if we gave it everything.
2171 */GNUNET_CADET_receive_done (op->channel);
2172 maybe_finish (op);
2173 return;
2174
2177 "got DONE (as active partner), waiting to finish\n");
2178 /* All demands of the other peer are satisfied,
2179 * and we processed all offers, thus we know
2180 * exactly what our demands must be.
2181 *
2182 * We'll close the channel
2183 * to the other peer once our demands are met.
2184 */op->state->phase = PHASE_FINISH_CLOSING;
2185 GNUNET_CADET_receive_done (op->channel);
2186 maybe_finish (op);
2187 return;
2188
2189 default:
2190 GNUNET_break_op (0);
2192 return;
2193 }
2194}
2195
2196
2197void
2199 const struct GNUNET_MessageHeader *mh)
2200{
2201 send_client_done (cls);
2202}
2203
2204
2212static struct OperationState *
2214 const struct GNUNET_MessageHeader *opaque_context)
2215{
2216 struct OperationState *state;
2217 struct GNUNET_MQ_Envelope *ev;
2219
2222 opaque_context);
2223 if (NULL == ev)
2224 {
2225 /* the context message is too large */
2226 GNUNET_break (0);
2227 return NULL;
2228 }
2229 state = GNUNET_new (struct OperationState);
2230 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2231 GNUNET_NO);
2232 /* copy the current generation's strata estimator for this operation */
2233 state->se = strata_estimator_dup (op->set->state->se);
2234 /* we started the operation, thus we have to send the operation request */
2235 state->phase = PHASE_EXPECT_SE;
2236 state->salt_receive = state->salt_send = 42; // FIXME?????
2238 "Initiating union operation evaluation\n");
2240 "# of total union operations",
2241 1,
2242 GNUNET_NO);
2244 "# of initiated union operations",
2245 1,
2246 GNUNET_NO);
2247 msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
2248 GNUNET_MQ_send (op->mq,
2249 ev);
2250
2251 if (NULL != opaque_context)
2253 "sent op request with context message\n");
2254 else
2256 "sent op request without context message\n");
2257
2258 op->state = state;
2261 state->key_to_element);
2262 return state;
2263}
2264
2265
2272static struct OperationState *
2274{
2275 struct OperationState *state;
2276 const struct StrataEstimator *se;
2277 struct GNUNET_MQ_Envelope *ev;
2278 struct StrataEstimatorMessage *strata_msg;
2279 char *buf;
2280 size_t len;
2281 uint16_t type;
2282
2284 "accepting set union operation\n");
2286 "# of accepted union operations",
2287 1,
2288 GNUNET_NO);
2290 "# of total union operations",
2291 1,
2292 GNUNET_NO);
2293
2294 state = GNUNET_new (struct OperationState);
2295 state->se = strata_estimator_dup (op->set->state->se);
2296 state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
2297 GNUNET_NO);
2298 state->salt_receive = state->salt_send = 42; // FIXME?????
2299 op->state = state;
2302 state->key_to_element);
2303
2304 /* kick off the operation */
2305 se = state->se;
2307 len = strata_estimator_write (se,
2308 buf);
2309 if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2311 else
2313 ev = GNUNET_MQ_msg_extra (strata_msg,
2314 len,
2315 type);
2316 GNUNET_memcpy (&strata_msg[1],
2317 buf,
2318 len);
2319 GNUNET_free (buf);
2320 strata_msg->set_size
2322 op->set->content->elements));
2323 GNUNET_MQ_send (op->mq,
2324 ev);
2325 state->phase = PHASE_EXPECT_IBF;
2326 return state;
2327}
2328
2329
2338static struct SetState *
2340{
2341 struct SetState *set_state;
2342
2344 "union set created\n");
2345 set_state = GNUNET_new (struct SetState);
2348 if (NULL == set_state->se)
2349 {
2351 "Failed to allocate strata estimator\n");
2352 GNUNET_free (set_state);
2353 return NULL;
2354 }
2355 return set_state;
2356}
2357
2358
2365static void
2366union_add (struct SetState *set_state,
2367 struct ElementEntry *ee)
2368{
2369 strata_estimator_insert (set_state->se,
2370 get_ibf_key (&ee->element_hash));
2371}
2372
2373
2381static void
2382union_remove (struct SetState *set_state,
2383 struct ElementEntry *ee)
2384{
2385 strata_estimator_remove (set_state->se,
2386 get_ibf_key (&ee->element_hash));
2387}
2388
2389
2395static void
2396union_set_destroy (struct SetState *set_state)
2397{
2398 if (NULL != set_state->se)
2399 {
2400 strata_estimator_destroy (set_state->se);
2401 set_state->se = NULL;
2402 }
2403 GNUNET_free (set_state);
2404}
2405
2406
2413static struct SetState *
2415{
2416 struct SetState *new_state;
2417
2418 GNUNET_assert ((NULL != state) &&
2419 (NULL != state->se));
2420 new_state = GNUNET_new (struct SetState);
2421 new_state->se = strata_estimator_dup (state->se);
2422
2423 return new_state;
2424}
2425
2426
2432static void
2434{
2437 GNUNET_YES);
2438}
2439
2440
2447const struct SetVT *
2449{
2450 static const struct SetVT union_vt = {
2452 .add = &union_add,
2453 .remove = &union_remove,
2454 .destroy_set = &union_set_destroy,
2455 .evaluate = &union_evaluate,
2456 .accept = &union_accept,
2457 .cancel = &union_op_cancel,
2458 .copy_state = &union_copy_state,
2459 .channel_death = &union_channel_death
2460 };
2461
2462 return &union_vt;
2463}
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
void ibf_subtract(struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2)
Subtract ibf2 from ibf1, storing the result in ibf1.
Definition: ibf.c:357
int ibf_decode(struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *ret_id)
Decode and remove an element from the IBF, if possible.
Definition: ibf.c:229
void ibf_read_slice(const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf)
Read buckets from a buffer into an ibf.
Definition: ibf.c:324
void ibf_write_slice(const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf)
Write buckets from an ibf to a buffer.
Definition: ibf.c:291
void ibf_insert(struct InvertibleBloomFilter *ibf, struct IBF_Key key)
Insert a key into an IBF.
Definition: ibf.c:168
struct InvertibleBloomFilter * ibf_dup(const struct InvertibleBloomFilter *ibf)
Create a copy of an IBF, the copy has to be destroyed properly.
Definition: ibf.c:380
struct InvertibleBloomFilter * ibf_create(uint32_t size, uint8_t hash_num)
Create an invertible bloom filter.
Definition: ibf.c:80
void ibf_destroy(struct InvertibleBloomFilter *ibf)
Destroy all resources associated with the invertible bloom filter.
Definition: ibf.c:404
#define IBF_BUCKET_SIZE
Size of one ibf bucket in bytes.
Definition: ibf.h:72
char * getenv()
static struct GNUNET_ARM_Operation * op
Current operation.
Definition: gnunet-arm.c:144
static int ret
Final status code.
Definition: gnunet-arm.c:94
static struct GNUNET_CADET_Handle * mh
Cadet handle.
Definition: gnunet-cadet.c:92
struct GNUNET_HashCode key
The key used in the DHT.
static struct GNUNET_FS_Handle * ctx
static char * name
Name (label) of the records to list.
static struct GNUNET_IDENTITY_EgoLookup * el
Handle to identity lookup.
static char * res
Currently read line or NULL on EOF.
static char * value
Value of the record to add/remove.
static uint32_t type
Type string converted to DNS type value.
static int status
The program status; 0 for success.
Definition: gnunet-nse.c:39
enum State state
current state of profiling
static struct GNUNET_CRYPTO_PowSalt salt
Salt for PoW calcualations.
Definition: gnunet-scrypt.c:34
void _GSS_operation_destroy2(struct Operation *op)
This function probably should not exist and be replaced by inlining more specific logic in the variou...
int _GSS_is_element_of_operation(struct ElementEntry *ee, struct Operation *op)
Is element ee part of the set used by op?
struct GNUNET_STATISTICS_Handle * _GSS_statistics
Statistics handle.
void _GSS_operation_destroy(struct Operation *op, int gc)
Destroy the given operation.
common components for the implementation the different set operations
Peer-to-Peer messages for gnunet set.
void handle_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
void handle_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Handle offers (of struct GNUNET_HashCodes) and respond with demands (of struct GNUNET_HashCodes).
int check_union_p2p_elements(void *cls, const struct GNUNET_SET_ElementMessage *emsg)
Check an element message from a remote peer.
static void union_set_destroy(struct SetState *set_state)
Destroy a set that supports the union operation.
#define MAX_BUCKETS_PER_MESSAGE
Number of buckets that can be transmitted in one message.
static int decode_and_send(struct Operation *op)
Decode which elements are missing on each side, and send the appropriate offers and inquiries.
void handle_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Handle an IBF message from a remote peer.
static int send_full_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Send a set element.
static struct OperationState * union_accept(struct Operation *op)
Accept an union operation request from a remote peer.
static struct OperationState * union_evaluate(struct Operation *op, const struct GNUNET_MessageHeader *opaque_context)
Initiate operation to evaluate a set union with a remote peer.
static void send_full_set(struct Operation *op)
Switch to full set transmission for op.
int check_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
int check_union_p2p_full_element(void *cls, const struct GNUNET_SET_ElementMessage *emsg)
Check a full element message from a remote peer.
void handle_union_p2p_full_element(void *cls, const struct GNUNET_SET_ElementMessage *emsg)
Handle an element message from a remote peer.
static struct IBF_Key get_ibf_key(const struct GNUNET_HashCode *src)
Derive the IBF key from a hash code and a salt.
static void maybe_finish(struct Operation *op)
Tests if the operation is finished, and if so notify.
static int init_key_to_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for initializing the key-to-element mapping of a union operation.
void handle_union_p2p_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a done message from a remote peer.
static int send_offers_iterator(void *cls, uint32_t key, void *value)
Iterator to send elements to a remote peer.
static struct SetState * union_set_create(void)
Create a new set supporting the union operation.
const struct SetVT * _GSS_union_vt()
Get the table with implementing functions for set union.
#define SE_IBF_SIZE
Size of the IBFs in the strata estimator.
static int send_missing_full_elements_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries.
int check_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Check offer (of struct GNUNET_HashCodes).
void handle_union_p2p_over(void *cls, const struct GNUNET_MessageHeader *mh)
Handle an over message from a remote peer.
static void union_remove(struct SetState *set_state, struct ElementEntry *ee)
Remove the element given in the element message from the set.
static int send_ibf(struct Operation *op, uint16_t ibf_order)
Send an ibf of appropriate size.
static struct SetState * union_copy_state(struct SetState *state)
Copy union-specific set state.
static void union_channel_death(struct Operation *op)
Handle case where channel went down for an operation.
#define IBF_ALPHA
Number of buckets used in the ibf per estimated difference.
static void fail_union_operation(struct Operation *op)
Inform the client that the union operation has failed, and proceed to destroy the evaluate operation.
static void send_client_done(void *cls)
Signal to the client that the operation has finished and destroy the operation.
void handle_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
static void unsalt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
FIXME.
static void salt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
FIXME.
static int prepare_ibf(struct Operation *op, uint32_t size)
Create an ibf with the operation's elements of the specified size.
static void op_register_element(struct Operation *op, struct ElementEntry *ee, int received)
Insert an element into the union operation's key-to-element mapping.
static void union_add(struct SetState *set_state, struct ElementEntry *ee)
Add the element from the given element message to the set.
static void initialize_key_to_element(struct Operation *op)
Initialize the IBF key to element mapping local to this set operation.
void handle_union_p2p_elements(void *cls, const struct GNUNET_SET_ElementMessage *emsg)
Handle an element message from a remote peer.
void handle_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
static void send_offers_for_key(struct Operation *op, struct IBF_Key ibf_key)
Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
void handle_union_p2p_full_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a "full done" message.
static int destroy_key_to_element_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries.
static void union_op_cancel(struct Operation *op)
Destroy the union operation.
#define SE_STRATA_COUNT
Number of IBFs in a strata estimator.
int check_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Check a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
int check_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Check an IBF message from a remote peer.
#define LOG(kind,...)
static struct KeyEntry * op_get_element(struct Operation *op, const struct GNUNET_HashCode *element_hash)
Determine whether the given element is already in the operation's element set.
UnionOperationPhase
Current phase we are in for a union operation.
@ PHASE_INVENTORY_PASSIVE
The other peer is decoding the IBF we just sent.
@ PHASE_FINISH_CLOSING
The protocol is almost finished, but we still have to flush our message queue and/or expect some elem...
@ PHASE_DONE
In the ultimate phase, we wait until our demands are satisfied and then quit (sending another DONE me...
@ PHASE_EXPECT_SE
We sent the request message, and expect a strata estimator.
@ PHASE_FINISH_WAITING
In the penultimate phase, we wait until all our demands are satisfied.
@ PHASE_EXPECT_IBF_CONT
Continuation for multi part IBFs.
@ PHASE_FULL_SENDING
After sending the full set, wait for responses with the elements that the local peer is missing.
@ PHASE_EXPECT_IBF
We sent the strata estimator, and expect an IBF.
@ PHASE_INVENTORY_ACTIVE
We are decoding an IBF.
static void send_client_element(struct Operation *op, struct GNUNET_SET_Element *element, int status)
Send a result message to the client indicating that there is a new element.
#define SE_IBF_HASH_NUM
The hash num parameter for the difference digests and strata estimators.
void handle_union_p2p_request_full(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a request for full set transmission.
static int op_get_element_iterator(void *cls, uint32_t key, void *value)
Iterator over the mapping from IBF keys to element entries.
static int prepare_ibf_iterator(void *cls, uint32_t key, void *value)
Insert a key into an ibf.
int check_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
#define MAX_IBF_ORDER
The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
static unsigned int get_order_from_difference(unsigned int diff)
Compute the necessary order of an ibf from the size of the symmetric set difference.
two-peer set operations
unsigned int strata_estimator_difference(const struct StrataEstimator *se1, const struct StrataEstimator *se2)
Estimate set difference with two strata estimators, i.e.
struct StrataEstimator * strata_estimator_create(unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum)
Create a new strata estimator with the given parameters.
void strata_estimator_destroy(struct StrataEstimator *se)
Destroy a strata estimator, free all of its resources.
int strata_estimator_read(const void *buf, size_t buf_len, int is_compressed, struct StrataEstimator *se)
Read strata from the buffer into the given strata estimator.
size_t strata_estimator_write(const struct StrataEstimator *se, void *buf)
Write the given strata estimator to the buffer.
void strata_estimator_insert(struct StrataEstimator *se, struct IBF_Key key)
Add a key to the strata estimator.
struct StrataEstimator * strata_estimator_dup(struct StrataEstimator *se)
Make a copy of a strata estimator.
void strata_estimator_remove(struct StrataEstimator *se, struct IBF_Key key)
Remove a key from the strata estimator.
API to create, modify and access statistics.
void GNUNET_CADET_receive_done(struct GNUNET_CADET_Channel *channel)
Indicate readiness to receive the next message on a channel.
Definition: cadet_api.c:872
int GNUNET_CRYPTO_hash_cmp(const struct GNUNET_HashCode *h1, const struct GNUNET_HashCode *h2)
Compare function for HashCodes, producing a total ordering of all hashcodes.
Definition: crypto_hash.c:221
enum GNUNET_GenericReturnValue GNUNET_CRYPTO_kdf(void *result, size_t out_len, const void *xts, size_t xts_len, const void *skm, size_t skm_len,...)
Derive key.
Definition: crypto_kdf.c:70
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_contains(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Check if the map contains any value under the given key (including values that are NULL).
int GNUNET_CONTAINER_multihashmap32_get_multiple(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, GNUNET_CONTAINER_MultiHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map that match a particular key.
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MultiHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap32_put(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
struct GNUNET_CONTAINER_MultiHashMap32 * GNUNET_CONTAINER_multihashmap32_create(unsigned int len)
Create a 32-bit key multi hash map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_remove(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, const void *value)
Remove the given key-value pair from the map.
unsigned int GNUNET_CONTAINER_multihashmap32_size(const struct GNUNET_CONTAINER_MultiHashMap32 *map)
Get the number of key-value pairs in the map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
unsigned int GNUNET_CONTAINER_multihashmap_size(const struct GNUNET_CONTAINER_MultiHashMap *map)
Get the number of key-value pairs in the map.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
void GNUNET_CONTAINER_multihashmap32_destroy(struct GNUNET_CONTAINER_MultiHashMap32 *map)
Destroy a 32-bit key hash map.
int GNUNET_CONTAINER_multihashmap32_iterate(struct GNUNET_CONTAINER_MultiHashMap32 *map, GNUNET_CONTAINER_MultiHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST
, ' bother checking if a value already exists (faster than GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE...
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE
Allow multiple values with the same key.
#define GNUNET_log(kind,...)
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:54
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:37
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
@ GNUNET_OK
@ GNUNET_YES
@ GNUNET_NO
@ GNUNET_SYSERR
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur.
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_DEBUG
@ GNUNET_ERROR_TYPE_INFO
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_free(ptr)
Wrapper around free.
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:304
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *mqm)
Discard the message queue message, free all allocated resources.
Definition: mq.c:285
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct.
Definition: gnunet_mq_lib.h:63
#define GNUNET_MQ_msg_nested_mh(mvar, type, mh)
Allocate a GNUNET_MQ_Envelope, and append a payload message after the given message struct.
#define GNUNET_MQ_msg_header(type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
Definition: gnunet_mq_lib.h:87
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:78
#define GNUNET_MQ_msg_header_extra(mh, esize, type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header and extra space.
Definition: gnunet_mq_lib.h:99
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER
Tell the other peer which hashes match a given IBF key.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY
Tell the other peer to send us a list of hashes that match an IBF key.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE
Strata estimator.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF
Invertible bloom filter.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE
Set operation is done.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC
Compressed strata estimator.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT
Send a set element, not as response to a demand but because we're sending the full set.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS
Actual set elements.
#define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
Request a set operation from a remote peer.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SET_RESULT
Create an empty set.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE
Request all missing elements from the other peer, based on their sets and the elements we previously ...
void GNUNET_SET_element_hash(const struct GNUNET_SET_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition: set_api.c:1184
@ GNUNET_SET_STATUS_OK
Everything went ok, we are transmitting an element of the result (in set, or to be removed from set,...
@ GNUNET_SET_STATUS_FAILURE
The other peer refused to to the operation with us, or something went wrong.
@ GNUNET_SET_STATUS_ADD_REMOTE
Element should be added to the result set of the remote peer, i.e.
@ GNUNET_SET_STATUS_DONE
Success, all elements have been sent (and received).
@ GNUNET_SET_STATUS_ADD_LOCAL
Element should be added to the result set of the local peer, i.e.
@ GNUNET_SET_RESULT_SYMMETRIC
Client gets notified of the required changes for both the local and the remote set.
@ GNUNET_SET_RESULT_ADDED
Client gets only elements that have been added to the set.
@ GNUNET_SET_OPERATION_UNION
Set union, return all elements that are in at least one of the sets.
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
static unsigned int size
Size of the "table".
Definition: peer.c:68
Information about an element element in the set.
int remote
GNUNET_YES if the element is a remote element, and does not belong to the operation's set.
struct GNUNET_SET_Element element
The actual element.
struct GNUNET_HashCode element_hash
Hash of the element.
Internal representation of the hash map.
Internal representation of the hash map.
A 512-bit hashcode.
Header for all communications.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
Message sent by client to the service to add or remove an element to/from the set.
Definition: set.h:281
uint16_t element_type
Type of the element to add or remove.
Definition: set.h:291
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_ADD or GNUNET_MESSAGE_TYPE_SET_REMOVE.
Definition: set.h:286
uint16_t reserved
For alignment, always zero.
Definition: set.h:296
Element stored in a set.
uint16_t size
Number of bytes in the buffer pointed to by data.
const void * data
Actual data of the element.
uint16_t element_type
Application-specific element type.
Message sent by the service to the client to indicate an element that is removed (set intersection) o...
Definition: set.h:245
uint32_t request_id
id the result belongs to
Definition: set.h:259
uint16_t result_status
Was the evaluation successful? Contains an enum GNUNET_SET_Status in NBO.
Definition: set.h:265
uint16_t element_type
Type of the element attached to the message, if any.
Definition: set.h:270
uint64_t current_size
Current set size.
Definition: set.h:254
Context for op_get_element_iterator.
struct GNUNET_HashCode hash
FIXME.
struct KeyEntry * k
FIXME.
Message containing buckets of an invertible bloom filter.
Keys that can be inserted into and removed from an IBF.
Definition: ibf.h:46
uint64_t key_val
Definition: ibf.h:47
estimate_best_mode_of_operation (uint64_t avg_element_size, uint64_t local_set_size,...
Invertible bloom filter (IBF).
Definition: ibf.h:83
uint32_t size
How many cells does this IBF have?
Definition: ibf.h:87
The key entry is used to associate an ibf key with an element.
struct ElementEntry * element
The actual element associated with the key.
struct IBF_Key ibf_key
IBF key for the entry, derived from the current salt.
int received
Did we receive this element? Even if element->is_foreign is false, we might have received the element...
State of an evaluate operation with another peer.
struct InvertibleBloomFilter * remote_ibf
The IBF we currently receive.
enum UnionOperationPhase phase
Current state of the operation.
struct StrataEstimator * se
Copy of the set's strata estimator at the time of creation of this operation.
uint32_t received_total
Total number of elements received from the other peer.
uint32_t salt_send
Salt that we're using for sending IBFs.
uint64_t initial_size
Initial size of our set, just before the operation started.
struct GNUNET_CONTAINER_MultiHashMap * demanded_hashes
Hashes for elements that we have demanded from the other peer.
struct InvertibleBloomFilter * local_ibf
The IBF with the local set's element.
struct GNUNET_CONTAINER_MultiHashMap32 * key_to_element
Maps unsalted IBF-Keys to elements.
uint32_t salt_receive
Salt for the IBF we've received and that we're currently decoding.
unsigned int ibf_buckets_received
Number of ibf buckets already received into the remote_ibf.
int client_done_sent
Did we send the client that we are done?
uint32_t received_fresh
Number of elements we received from the other peer that were not in the local set yet.
Operation context used to execute a set operation.
Used as a closure for sending elements with a specific IBF key.
struct Operation * op
Operation for which the elements should be sent.
struct IBF_Key ibf_key
The IBF key whose matching elements should be sent.
Extra state required for efficient set intersection.
struct StrataEstimator * se
The strata estimator is only generated once for each set.
Dispatch table for a specific set operation.
SetCreateImpl create
Callback for the set creation.
Strata estimator together with the peer's overall set size.
uint64_t set_size
Size of the local set.
A handle to a strata estimator.
unsigned int ibf_size
Size of each IBF stratum (in bytes)
unsigned int strata_count
Size of the IBF array in strata.