GNUnet  0.10.x
gnunet-service-set_union.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2013-2017 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
29 #include "gnunet-service-set.h"
30 #include "ibf.h"
34 #include <gcrypt.h>
35 
36 
37 #define LOG(kind, ...) GNUNET_log_from(kind, "set-union", __VA_ARGS__)
38 
39 
43 #define SE_STRATA_COUNT 32
44 
48 #define SE_IBF_SIZE 80
49 
53 #define SE_IBF_HASH_NUM 4
54 
58 #define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
59 
65 #define MAX_IBF_ORDER (20)
66 
71 #define IBF_ALPHA 4
72 
73 
82 
93 
98 
103 
108 
114 
122 
129 
135 };
136 
137 
141 struct OperationState {
147 
152 
157 
164 
169 
173  int client_done_sent;
174 
178  unsigned int ibf_buckets_received;
179 
184 
188  uint32_t salt_send;
189 
193  uint32_t salt_receive;
194 
199  uint32_t received_fresh;
200 
204  uint32_t received_total;
205 
210  uint64_t initial_size;
211 };
212 
213 
217 struct KeyEntry {
221  struct IBF_Key ibf_key;
222 
230 
237  int received;
238 };
239 
240 
250  struct IBF_Key ibf_key;
251 
256  struct Operation *op;
257 };
258 
259 
263 struct SetState {
271 };
272 
273 
284 static int
286  uint32_t key,
287  void *value)
288 {
289  struct KeyEntry *k = value;
290 
291  GNUNET_assert(NULL != k);
292  if (GNUNET_YES == k->element->remote)
293  {
294  GNUNET_free(k->element);
295  k->element = NULL;
296  }
297  GNUNET_free(k);
298  return GNUNET_YES;
299 }
300 
301 
308 static void
310 {
312  "destroying union op\n");
313  /* check if the op was canceled twice */
314  GNUNET_assert(NULL != op->state);
315  if (NULL != op->state->remote_ibf)
316  {
318  op->state->remote_ibf = NULL;
319  }
320  if (NULL != op->state->demanded_hashes)
321  {
323  op->state->demanded_hashes = NULL;
324  }
325  if (NULL != op->state->local_ibf)
326  {
328  op->state->local_ibf = NULL;
329  }
330  if (NULL != op->state->se)
331  {
333  op->state->se = NULL;
334  }
335  if (NULL != op->state->key_to_element)
336  {
339  NULL);
341  op->state->key_to_element = NULL;
342  }
343  GNUNET_free(op->state);
344  op->state = NULL;
346  "destroying union op done\n");
347 }
348 
349 
356 static void
358 {
359  struct GNUNET_MQ_Envelope *ev;
361 
363  "union operation failed\n");
366  msg->request_id = htonl(op->client_request_id);
367  msg->element_type = htons(0);
368  GNUNET_MQ_send(op->set->cs->mq,
369  ev);
371 }
372 
373 
381 static struct IBF_Key
382 get_ibf_key(const struct GNUNET_HashCode *src)
383 {
384  struct IBF_Key key;
385  uint16_t salt = 0;
386 
388  GNUNET_CRYPTO_kdf(&key, sizeof(key),
389  src, sizeof *src,
390  &salt, sizeof(salt),
391  NULL, 0));
392  return key;
393 }
394 
395 
403  struct GNUNET_HashCode hash;
404 
408  struct KeyEntry *k;
409 };
410 
411 
422 static int
424  uint32_t key,
425  void *value)
426 {
427  struct GetElementContext *ctx = cls;
428  struct KeyEntry *k = value;
429 
430  GNUNET_assert(NULL != k);
432  &ctx->hash))
433  {
434  ctx->k = k;
435  return GNUNET_NO;
436  }
437  return GNUNET_YES;
438 }
439 
440 
449 static struct KeyEntry *
451  const struct GNUNET_HashCode *element_hash)
452 {
453  int ret;
454  struct IBF_Key ibf_key;
455  struct GetElementContext ctx = { { { 0 } }, 0 };
456 
457  ctx.hash = *element_hash;
458 
459  ibf_key = get_ibf_key(element_hash);
461  (uint32_t)ibf_key.key_val,
463  &ctx);
464 
465  /* was the iteration aborted because we found the element? */
466  if (GNUNET_SYSERR == ret)
467  {
468  GNUNET_assert(NULL != ctx.k);
469  return ctx.k;
470  }
471  return NULL;
472 }
473 
474 
489 static void
491  struct ElementEntry *ee,
492  int received)
493 {
494  struct IBF_Key ibf_key;
495  struct KeyEntry *k;
496 
497  ibf_key = get_ibf_key(&ee->element_hash);
498  k = GNUNET_new(struct KeyEntry);
499  k->element = ee;
500  k->ibf_key = ibf_key;
501  k->received = received;
504  (uint32_t)ibf_key.key_val,
505  k,
507 }
508 
509 
513 static void
514 salt_key(const struct IBF_Key *k_in,
515  uint32_t salt,
516  struct IBF_Key *k_out)
517 {
518  int s = salt % 64;
519  uint64_t x = k_in->key_val;
520 
521  /* rotate ibf key */
522  x = (x >> s) | (x << (64 - s));
523  k_out->key_val = x;
524 }
525 
526 
530 static void
531 unsalt_key(const struct IBF_Key *k_in,
532  uint32_t salt,
533  struct IBF_Key *k_out)
534 {
535  int s = salt % 64;
536  uint64_t x = k_in->key_val;
537 
538  x = (x << s) | (x >> (64 - s));
539  k_out->key_val = x;
540 }
541 
542 
550 static int
552  uint32_t key,
553  void *value)
554 {
555  struct Operation *op = cls;
556  struct KeyEntry *ke = value;
557  struct IBF_Key salted_key;
558 
560  "[OP %x] inserting %lx (hash %s) into ibf\n",
561  (void *)op,
562  (unsigned long)ke->ibf_key.key_val,
564  salt_key(&ke->ibf_key,
565  op->state->salt_send,
566  &salted_key);
567  ibf_insert(op->state->local_ibf, salted_key);
568  return GNUNET_YES;
569 }
570 
571 
582 static int
584  const struct GNUNET_HashCode *key,
585  void *value)
586 {
587  struct Operation *op = cls;
588  struct ElementEntry *ee = value;
589 
590  /* make sure that the element belongs to the set at the time
591  * of creating the operation */
592  if (GNUNET_NO ==
594  op))
595  return GNUNET_YES;
598  ee,
599  GNUNET_NO);
600  return GNUNET_YES;
601 }
602 
603 
610 static void
612 {
613  unsigned int len;
614 
615  GNUNET_assert(NULL == op->state->key_to_element);
620  op);
621 }
622 
623 
632 static int
634  uint32_t size)
635 {
636  GNUNET_assert(NULL != op->state->key_to_element);
637 
638  if (NULL != op->state->local_ibf)
641  if (NULL == op->state->local_ibf)
642  {
644  "Failed to allocate local IBF\n");
645  return GNUNET_SYSERR;
646  }
649  op);
650  return GNUNET_OK;
651 }
652 
653 
663 static int
665  uint16_t ibf_order)
666 {
667  unsigned int buckets_sent = 0;
668  struct InvertibleBloomFilter *ibf;
669 
670  if (GNUNET_OK !=
671  prepare_ibf(op, 1 << ibf_order))
672  {
673  /* allocation failed */
674  return GNUNET_SYSERR;
675  }
676 
678  "sending ibf of size %u\n",
679  1 << ibf_order);
680 
681  {
682  char name[64] = { 0 };
683  snprintf(name, sizeof(name), "# sent IBF (order %u)", ibf_order);
685  }
686 
687  ibf = op->state->local_ibf;
688 
689  while (buckets_sent < (1 << ibf_order))
690  {
691  unsigned int buckets_in_message;
692  struct GNUNET_MQ_Envelope *ev;
693  struct IBFMessage *msg;
694 
695  buckets_in_message = (1 << ibf_order) - buckets_sent;
696  /* limit to maximum */
697  if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
698  buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
699 
700  ev = GNUNET_MQ_msg_extra(msg,
701  buckets_in_message * IBF_BUCKET_SIZE,
703  msg->reserved1 = 0;
704  msg->reserved2 = 0;
705  msg->order = ibf_order;
706  msg->offset = htonl(buckets_sent);
707  msg->salt = htonl(op->state->salt_send);
708  ibf_write_slice(ibf, buckets_sent,
709  buckets_in_message, &msg[1]);
710  buckets_sent += buckets_in_message;
712  "ibf chunk size %u, %u/%u sent\n",
713  buckets_in_message,
714  buckets_sent,
715  1 << ibf_order);
716  GNUNET_MQ_send(op->mq, ev);
717  }
718 
719  /* The other peer must decode the IBF, so
720  * we're passive. */
722  return GNUNET_OK;
723 }
724 
725 
733 static unsigned int
734 get_order_from_difference(unsigned int diff)
735 {
736  unsigned int ibf_order;
737 
738  ibf_order = 2;
739  while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
740  ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
741  (ibf_order < MAX_IBF_ORDER))
742  ibf_order++;
743  // add one for correction
744  return ibf_order + 1;
745 }
746 
747 
757 static int
759  const struct GNUNET_HashCode *key,
760  void *value)
761 {
762  struct Operation *op = cls;
763  struct GNUNET_SET_ElementMessage *emsg;
764  struct ElementEntry *ee = value;
765  struct GNUNET_SET_Element *el = &ee->element;
766  struct GNUNET_MQ_Envelope *ev;
767 
769  "Sending element %s\n",
770  GNUNET_h2s(key));
771  ev = GNUNET_MQ_msg_extra(emsg,
772  el->size,
774  emsg->element_type = htons(el->element_type);
775  GNUNET_memcpy(&emsg[1],
776  el->data,
777  el->size);
778  GNUNET_MQ_send(op->mq,
779  ev);
780  return GNUNET_YES;
781 }
782 
783 
789 static void
791 {
792  struct GNUNET_MQ_Envelope *ev;
793 
796  "Dedicing to transmit the full set\n");
797  /* FIXME: use a more memory-friendly way of doing this with an
798  iterator, just as we do in the non-full case! */
801  op);
803  GNUNET_MQ_send(op->mq,
804  ev);
805 }
806 
807 
814 int
816  const struct StrataEstimatorMessage *msg)
817 {
818  struct Operation *op = cls;
819  int is_compressed;
820  size_t len;
821 
822  if (op->state->phase != PHASE_EXPECT_SE)
823  {
824  GNUNET_break(0);
825  return GNUNET_SYSERR;
826  }
827  is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons(msg->header.type));
828  len = ntohs(msg->header.size) - sizeof(struct StrataEstimatorMessage);
829  if ((GNUNET_NO == is_compressed) &&
831  {
832  GNUNET_break(0);
833  return GNUNET_SYSERR;
834  }
835  return GNUNET_OK;
836 }
837 
838 
845 void
847  const struct StrataEstimatorMessage *msg)
848 {
849  struct Operation *op = cls;
850  struct StrataEstimator *remote_se;
851  unsigned int diff;
852  uint64_t other_size;
853  size_t len;
854  int is_compressed;
855 
856  is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons(msg->header.type));
858  "# bytes of SE received",
859  ntohs(msg->header.size),
860  GNUNET_NO);
861  len = ntohs(msg->header.size) - sizeof(struct StrataEstimatorMessage);
862  other_size = GNUNET_ntohll(msg->set_size);
864  SE_IBF_SIZE,
866  if (NULL == remote_se)
867  {
868  /* insufficient resources, fail */
870  return;
871  }
872  if (GNUNET_OK !=
873  strata_estimator_read(&msg[1],
874  len,
875  is_compressed,
876  remote_se))
877  {
878  /* decompression failed */
879  strata_estimator_destroy(remote_se);
881  return;
882  }
883  GNUNET_assert(NULL != op->state->se);
884  diff = strata_estimator_difference(remote_se,
885  op->state->se);
886 
887  if (diff > 200)
888  diff = diff * 3 / 2;
889 
890  strata_estimator_destroy(remote_se);
892  op->state->se = NULL;
894  "got se diff=%d, using ibf size %d\n",
895  diff,
896  1U << get_order_from_difference(diff));
897 
898  {
899  char *set_debug;
900 
901  set_debug = getenv("GNUNET_SET_BENCHMARK");
902  if ((NULL != set_debug) &&
903  (0 == strcmp(set_debug, "1")))
904  {
905  FILE *f = fopen("set.log", "a");
906  fprintf(f, "%llu\n", (unsigned long long)diff);
907  fclose(f);
908  }
909  }
910 
911  if ((GNUNET_YES == op->byzantine) &&
912  (other_size < op->byzantine_lower_bound))
913  {
914  GNUNET_break(0);
916  return;
917  }
918 
919  if ((GNUNET_YES == op->force_full) ||
920  (diff > op->state->initial_size / 4) ||
921  (0 == other_size))
922  {
924  "Deciding to go for full set transmission (diff=%d, own set=%u)\n",
925  diff,
926  op->state->initial_size);
928  "# of full sends",
929  1,
930  GNUNET_NO);
931  if ((op->state->initial_size <= other_size) ||
932  (0 == other_size))
933  {
934  send_full_set(op);
935  }
936  else
937  {
938  struct GNUNET_MQ_Envelope *ev;
939 
941  "Telling other peer that we expect its full set\n");
944  GNUNET_MQ_send(op->mq,
945  ev);
946  }
947  }
948  else
949  {
951  "# of ibf sends",
952  1,
953  GNUNET_NO);
954  if (GNUNET_OK !=
955  send_ibf(op,
957  {
958  /* Internal error, best we can do is shut the connection */
960  "Failed to send IBF, closing connection\n");
962  return;
963  }
964  }
966 }
967 
968 
976 static int
978  uint32_t key,
979  void *value)
980 {
981  struct SendElementClosure *sec = cls;
982  struct Operation *op = sec->op;
983  struct KeyEntry *ke = value;
984  struct GNUNET_MQ_Envelope *ev;
985  struct GNUNET_MessageHeader *mh;
986 
987  /* Detect 32-bit key collision for the 64-bit IBF keys. */
988  if (ke->ibf_key.key_val != sec->ibf_key.key_val)
989  return GNUNET_YES;
990 
992  sizeof(struct GNUNET_HashCode),
994 
995  GNUNET_assert(NULL != ev);
996  *(struct GNUNET_HashCode *)&mh[1] = ke->element->element_hash;
998  "[OP %x] sending element offer (%s) to peer\n",
999  (void *)op,
1001  GNUNET_MQ_send(op->mq, ev);
1002  return GNUNET_YES;
1003 }
1004 
1005 
1012 static void
1014  struct IBF_Key ibf_key)
1015 {
1016  struct SendElementClosure send_cls;
1017 
1018  send_cls.ibf_key = ibf_key;
1019  send_cls.op = op;
1021  (uint32_t)ibf_key.key_val,
1023  &send_cls);
1024 }
1025 
1026 
1034 static int
1036 {
1037  struct IBF_Key key;
1038  struct IBF_Key last_key;
1039  int side;
1040  unsigned int num_decoded;
1041  struct InvertibleBloomFilter *diff_ibf;
1042 
1044 
1045  if (GNUNET_OK !=
1046  prepare_ibf(op,
1047  op->state->remote_ibf->size))
1048  {
1049  GNUNET_break(0);
1050  /* allocation failed */
1051  return GNUNET_SYSERR;
1052  }
1053  diff_ibf = ibf_dup(op->state->local_ibf);
1054  ibf_subtract(diff_ibf,
1055  op->state->remote_ibf);
1056 
1058  op->state->remote_ibf = NULL;
1059 
1061  "decoding IBF (size=%u)\n",
1062  diff_ibf->size);
1063 
1064  num_decoded = 0;
1065  key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1066 
1067  while (1)
1068  {
1069  int res;
1070  int cycle_detected = GNUNET_NO;
1071 
1072  last_key = key;
1073 
1074  res = ibf_decode(diff_ibf, &side, &key);
1075  if (res == GNUNET_OK)
1076  {
1078  "decoded ibf key %lx\n",
1079  (unsigned long)key.key_val);
1080  num_decoded += 1;
1081  if ((num_decoded > diff_ibf->size) ||
1082  ((num_decoded > 1) &&
1083  (last_key.key_val == key.key_val)))
1084  {
1086  "detected cyclic ibf (decoded %u/%u)\n",
1087  num_decoded,
1088  diff_ibf->size);
1089  cycle_detected = GNUNET_YES;
1090  }
1091  }
1092  if ((GNUNET_SYSERR == res) ||
1093  (GNUNET_YES == cycle_detected))
1094  {
1095  int next_order;
1096  next_order = 0;
1097  while (1 << next_order < diff_ibf->size)
1098  next_order++;
1099  next_order++;
1100  if (next_order <= MAX_IBF_ORDER)
1101  {
1103  "decoding failed, sending larger ibf (size %u)\n",
1104  1 << next_order);
1106  "# of IBF retries",
1107  1,
1108  GNUNET_NO);
1109  op->state->salt_send++;
1110  if (GNUNET_OK !=
1111  send_ibf(op, next_order))
1112  {
1113  /* Internal error, best we can do is shut the connection */
1115  "Failed to send IBF, closing connection\n");
1117  ibf_destroy(diff_ibf);
1118  return GNUNET_SYSERR;
1119  }
1120  }
1121  else
1122  {
1124  "# of failed union operations (too large)",
1125  1,
1126  GNUNET_NO);
1127  // XXX: Send the whole set, element-by-element
1129  "set union failed: reached ibf limit\n");
1131  ibf_destroy(diff_ibf);
1132  return GNUNET_SYSERR;
1133  }
1134  break;
1135  }
1136  if (GNUNET_NO == res)
1137  {
1138  struct GNUNET_MQ_Envelope *ev;
1139 
1141  "transmitted all values, sending DONE\n");
1143  GNUNET_MQ_send(op->mq, ev);
1144  /* We now wait until we get a DONE message back
1145  * and then wait for our MQ to be flushed and all our
1146  * demands be delivered. */
1147  break;
1148  }
1149  if (1 == side)
1150  {
1151  struct IBF_Key unsalted_key;
1152 
1153  unsalt_key(&key,
1154  op->state->salt_receive,
1155  &unsalted_key);
1157  unsalted_key);
1158  }
1159  else if (-1 == side)
1160  {
1161  struct GNUNET_MQ_Envelope *ev;
1162  struct InquiryMessage *msg;
1163 
1164  /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1165  * the effort additional complexity. */
1166  ev = GNUNET_MQ_msg_extra(msg,
1167  sizeof(struct IBF_Key),
1169  msg->salt = htonl(op->state->salt_receive);
1170  GNUNET_memcpy(&msg[1],
1171  &key,
1172  sizeof(struct IBF_Key));
1174  "sending element inquiry for IBF key %lx\n",
1175  (unsigned long)key.key_val);
1176  GNUNET_MQ_send(op->mq, ev);
1177  }
1178  else
1179  {
1180  GNUNET_assert(0);
1181  }
1182  }
1183  ibf_destroy(diff_ibf);
1184  return GNUNET_OK;
1185 }
1186 
1187 
1198 int
1200  const struct IBFMessage *msg)
1201 {
1202  struct Operation *op = cls;
1203  unsigned int buckets_in_message;
1204 
1206  {
1207  GNUNET_break_op(0);
1208  return GNUNET_SYSERR;
1209  }
1210  buckets_in_message = (ntohs(msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1211  if (0 == buckets_in_message)
1212  {
1213  GNUNET_break_op(0);
1214  return GNUNET_SYSERR;
1215  }
1216  if ((ntohs(msg->header.size) - sizeof *msg) != buckets_in_message * IBF_BUCKET_SIZE)
1217  {
1218  GNUNET_break_op(0);
1219  return GNUNET_SYSERR;
1220  }
1221  if (op->state->phase == PHASE_EXPECT_IBF_CONT)
1222  {
1223  if (ntohl(msg->offset) != op->state->ibf_buckets_received)
1224  {
1225  GNUNET_break_op(0);
1226  return GNUNET_SYSERR;
1227  }
1228  if (1 << msg->order != op->state->remote_ibf->size)
1229  {
1230  GNUNET_break_op(0);
1231  return GNUNET_SYSERR;
1232  }
1233  if (ntohl(msg->salt) != op->state->salt_receive)
1234  {
1235  GNUNET_break_op(0);
1236  return GNUNET_SYSERR;
1237  }
1238  }
1239  else if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
1240  (op->state->phase != PHASE_EXPECT_IBF))
1241  {
1242  GNUNET_break_op(0);
1243  return GNUNET_SYSERR;
1244  }
1245 
1246  return GNUNET_OK;
1247 }
1248 
1249 
1259 void
1261  const struct IBFMessage *msg)
1262 {
1263  struct Operation *op = cls;
1264  unsigned int buckets_in_message;
1265 
1266  buckets_in_message = (ntohs(msg->header.size) - sizeof *msg) / IBF_BUCKET_SIZE;
1267  if ((op->state->phase == PHASE_INVENTORY_PASSIVE) ||
1268  (op->state->phase == PHASE_EXPECT_IBF))
1269  {
1271  GNUNET_assert(NULL == op->state->remote_ibf);
1273  "Creating new ibf of size %u\n",
1274  1 << msg->order);
1275  op->state->remote_ibf = ibf_create(1 << msg->order, SE_IBF_HASH_NUM);
1276  op->state->salt_receive = ntohl(msg->salt);
1278  "Receiving new IBF with salt %u\n",
1279  op->state->salt_receive);
1280  if (NULL == op->state->remote_ibf)
1281  {
1283  "Failed to parse remote IBF, closing connection\n");
1285  return;
1286  }
1287  op->state->ibf_buckets_received = 0;
1288  if (0 != ntohl(msg->offset))
1289  {
1290  GNUNET_break_op(0);
1292  return;
1293  }
1294  }
1295  else
1296  {
1299  "Received more of IBF\n");
1300  }
1301  GNUNET_assert(NULL != op->state->remote_ibf);
1302 
1303  ibf_read_slice(&msg[1],
1305  buckets_in_message,
1306  op->state->remote_ibf);
1307  op->state->ibf_buckets_received += buckets_in_message;
1308 
1309  if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
1310  {
1312  "received full ibf\n");
1314  if (GNUNET_OK !=
1315  decode_and_send(op))
1316  {
1317  /* Internal error, best we can do is shut down */
1319  "Failed to decode IBF, closing connection\n");
1321  return;
1322  }
1323  }
1325 }
1326 
1327 
1336 static void
1338  struct GNUNET_SET_Element *element,
1339  int status)
1340 {
1341  struct GNUNET_MQ_Envelope *ev;
1342  struct GNUNET_SET_ResultMessage *rm;
1343 
1345  "sending element (size %u) to client\n",
1346  element->size);
1347  GNUNET_assert(0 != op->client_request_id);
1349  if (NULL == ev)
1350  {
1351  GNUNET_MQ_discard(ev);
1352  GNUNET_break(0);
1353  return;
1354  }
1355  rm->result_status = htons(status);
1356  rm->request_id = htonl(op->client_request_id);
1357  rm->element_type = htons(element->element_type);
1359  GNUNET_memcpy(&rm[1],
1360  element->data,
1361  element->size);
1362  GNUNET_MQ_send(op->set->cs->mq,
1363  ev);
1364 }
1365 
1366 
1373 static void
1375 {
1376  struct Operation *op = cls;
1377  struct GNUNET_MQ_Envelope *ev;
1378  struct GNUNET_SET_ResultMessage *rm;
1379 
1380  if (GNUNET_YES == op->state->client_done_sent)
1381  {
1382  return;
1383  }
1384 
1385  if (PHASE_DONE != op->state->phase)
1386  {
1388  "Union operation failed\n");
1390  "# Union operations failed",
1391  1,
1392  GNUNET_NO);
1395  rm->request_id = htonl(op->client_request_id);
1396  rm->element_type = htons(0);
1397  GNUNET_MQ_send(op->set->cs->mq,
1398  ev);
1399  return;
1400  }
1401 
1403 
1405  "# Union operations succeeded",
1406  1,
1407  GNUNET_NO);
1409  "Signalling client that union operation is done\n");
1410  ev = GNUNET_MQ_msg(rm,
1412  rm->request_id = htonl(op->client_request_id);
1414  rm->element_type = htons(0);
1416  GNUNET_MQ_send(op->set->cs->mq,
1417  ev);
1418 }
1419 
1420 
1426 static void
1428 {
1429  unsigned int num_demanded;
1430 
1432 
1433  if (PHASE_FINISH_WAITING == op->state->phase)
1434  {
1436  "In PHASE_FINISH_WAITING, pending %u demands\n",
1437  num_demanded);
1438  if (0 == num_demanded)
1439  {
1440  struct GNUNET_MQ_Envelope *ev;
1441 
1442  op->state->phase = PHASE_DONE;
1444  GNUNET_MQ_send(op->mq,
1445  ev);
1446  /* We now wait until the other peer sends P2P_OVER
1447  * after it got all elements from us. */
1448  }
1449  }
1450  if (PHASE_FINISH_CLOSING == op->state->phase)
1451  {
1453  "In PHASE_FINISH_CLOSING, pending %u demands\n",
1454  num_demanded);
1455  if (0 == num_demanded)
1456  {
1457  op->state->phase = PHASE_DONE;
1458  send_client_done(op);
1460  }
1461  }
1462 }
1463 
1464 
1471 int
1473  const struct GNUNET_SET_ElementMessage *emsg)
1474 {
1475  struct Operation *op = cls;
1476 
1478  {
1479  GNUNET_break_op(0);
1480  return GNUNET_SYSERR;
1481  }
1483  {
1484  GNUNET_break_op(0);
1485  return GNUNET_SYSERR;
1486  }
1487  return GNUNET_OK;
1488 }
1489 
1490 
1499 void
1501  const struct GNUNET_SET_ElementMessage *emsg)
1502 {
1503  struct Operation *op = cls;
1504  struct ElementEntry *ee;
1505  struct KeyEntry *ke;
1506  uint16_t element_size;
1507 
1508  element_size = ntohs(emsg->header.size) - sizeof(struct GNUNET_SET_ElementMessage);
1509  ee = GNUNET_malloc(sizeof(struct ElementEntry) + element_size);
1510  GNUNET_memcpy(&ee[1],
1511  &emsg[1],
1512  element_size);
1513  ee->element.size = element_size;
1514  ee->element.data = &ee[1];
1515  ee->element.element_type = ntohs(emsg->element_type);
1516  ee->remote = GNUNET_YES;
1518  &ee->element_hash);
1519  if (GNUNET_NO ==
1521  &ee->element_hash,
1522  NULL))
1523  {
1524  /* We got something we didn't demand, since it's not in our map. */
1525  GNUNET_break_op(0);
1527  return;
1528  }
1529 
1531  "Got element (size %u, hash %s) from peer\n",
1532  (unsigned int)element_size,
1533  GNUNET_h2s(&ee->element_hash));
1534 
1536  "# received elements",
1537  1,
1538  GNUNET_NO);
1540  "# exchanged elements",
1541  1,
1542  GNUNET_NO);
1543 
1544  op->state->received_total++;
1545 
1546  ke = op_get_element(op, &ee->element_hash);
1547  if (NULL != ke)
1548  {
1549  /* Got repeated element. Should not happen since
1550  * we track demands. */
1552  "# repeated elements",
1553  1,
1554  GNUNET_NO);
1555  ke->received = GNUNET_YES;
1556  GNUNET_free(ee);
1557  }
1558  else
1559  {
1561  "Registering new element from remote peer\n");
1562  op->state->received_fresh++;
1564  /* only send results immediately if the client wants it */
1565  switch (op->result_mode)
1566  {
1569  break;
1570 
1573  break;
1574 
1575  default:
1576  /* Result mode not supported, should have been caught earlier. */
1577  GNUNET_break(0);
1578  break;
1579  }
1580  }
1581 
1582  if ((op->state->received_total > 8) &&
1583  (op->state->received_fresh < op->state->received_total / 3))
1584  {
1585  /* The other peer gave us lots of old elements, there's something wrong. */
1586  GNUNET_break_op(0);
1588  return;
1589  }
1591  maybe_finish(op);
1592 }
1593 
1594 
1601 int
1603  const struct GNUNET_SET_ElementMessage *emsg)
1604 {
1605  struct Operation *op = cls;
1606 
1608  {
1609  GNUNET_break_op(0);
1610  return GNUNET_SYSERR;
1611  }
1612  // FIXME: check that we expect full elements here?
1613  return GNUNET_OK;
1614 }
1615 
1616 
1623 void
1625  const struct GNUNET_SET_ElementMessage *emsg)
1626 {
1627  struct Operation *op = cls;
1628  struct ElementEntry *ee;
1629  struct KeyEntry *ke;
1630  uint16_t element_size;
1631 
1632  element_size = ntohs(emsg->header.size) - sizeof(struct GNUNET_SET_ElementMessage);
1633  ee = GNUNET_malloc(sizeof(struct ElementEntry) + element_size);
1634  GNUNET_memcpy(&ee[1], &emsg[1], element_size);
1635  ee->element.size = element_size;
1636  ee->element.data = &ee[1];
1637  ee->element.element_type = ntohs(emsg->element_type);
1638  ee->remote = GNUNET_YES;
1640 
1642  "Got element (full diff, size %u, hash %s) from peer\n",
1643  (unsigned int)element_size,
1644  GNUNET_h2s(&ee->element_hash));
1645 
1647  "# received elements",
1648  1,
1649  GNUNET_NO);
1651  "# exchanged elements",
1652  1,
1653  GNUNET_NO);
1654 
1655  op->state->received_total++;
1656 
1657  ke = op_get_element(op, &ee->element_hash);
1658  if (NULL != ke)
1659  {
1660  /* Got repeated element. Should not happen since
1661  * we track demands. */
1663  "# repeated elements",
1664  1,
1665  GNUNET_NO);
1666  ke->received = GNUNET_YES;
1667  GNUNET_free(ee);
1668  }
1669  else
1670  {
1672  "Registering new element from remote peer\n");
1673  op->state->received_fresh++;
1675  /* only send results immediately if the client wants it */
1676  switch (op->result_mode)
1677  {
1680  break;
1681 
1684  break;
1685 
1686  default:
1687  /* Result mode not supported, should have been caught earlier. */
1688  GNUNET_break(0);
1689  break;
1690  }
1691  }
1692 
1693  if ((GNUNET_YES == op->byzantine) &&
1694  (op->state->received_total > 384 + op->state->received_fresh * 4) &&
1695  (op->state->received_fresh < op->state->received_total / 6))
1696  {
1697  /* The other peer gave us lots of old elements, there's something wrong. */
1699  "Other peer sent only %llu/%llu fresh elements, failing operation\n",
1700  (unsigned long long)op->state->received_fresh,
1701  (unsigned long long)op->state->received_total);
1702  GNUNET_break_op(0);
1704  return;
1705  }
1707 }
1708 
1709 
1717 int
1719  const struct InquiryMessage *msg)
1720 {
1721  struct Operation *op = cls;
1722  unsigned int num_keys;
1723 
1725  {
1726  GNUNET_break_op(0);
1727  return GNUNET_SYSERR;
1728  }
1729  if (op->state->phase != PHASE_INVENTORY_PASSIVE)
1730  {
1731  GNUNET_break_op(0);
1732  return GNUNET_SYSERR;
1733  }
1734  num_keys = (ntohs(msg->header.size) - sizeof(struct InquiryMessage))
1735  / sizeof(struct IBF_Key);
1736  if ((ntohs(msg->header.size) - sizeof(struct InquiryMessage))
1737  != num_keys * sizeof(struct IBF_Key))
1738  {
1739  GNUNET_break_op(0);
1740  return GNUNET_SYSERR;
1741  }
1742  return GNUNET_OK;
1743 }
1744 
1745 
1753 void
1755  const struct InquiryMessage *msg)
1756 {
1757  struct Operation *op = cls;
1758  const struct IBF_Key *ibf_key;
1759  unsigned int num_keys;
1760 
1762  "Received union inquiry\n");
1763  num_keys = (ntohs(msg->header.size) - sizeof(struct InquiryMessage))
1764  / sizeof(struct IBF_Key);
1765  ibf_key = (const struct IBF_Key *)&msg[1];
1766  while (0 != num_keys--)
1767  {
1768  struct IBF_Key unsalted_key;
1769 
1770  unsalt_key(ibf_key,
1771  ntohl(msg->salt),
1772  &unsalted_key);
1774  unsalted_key);
1775  ibf_key++;
1776  }
1778 }
1779 
1780 
1791 static int
1793  uint32_t key,
1794  void *value)
1795 {
1796  struct Operation *op = cls;
1797  struct KeyEntry *ke = value;
1798  struct GNUNET_MQ_Envelope *ev;
1799  struct GNUNET_SET_ElementMessage *emsg;
1800  struct ElementEntry *ee = ke->element;
1801 
1802  if (GNUNET_YES == ke->received)
1803  return GNUNET_YES;
1804  ev = GNUNET_MQ_msg_extra(emsg,
1805  ee->element.size,
1807  GNUNET_memcpy(&emsg[1],
1808  ee->element.data,
1809  ee->element.size);
1810  emsg->element_type = htons(ee->element.element_type);
1811  GNUNET_MQ_send(op->mq,
1812  ev);
1813  return GNUNET_YES;
1814 }
1815 
1816 
1823 void
1825  const struct GNUNET_MessageHeader *mh)
1826 {
1827  struct Operation *op = cls;
1828 
1830  "Received request for full set transmission\n");
1832  {
1833  GNUNET_break_op(0);
1835  return;
1836  }
1837  if (PHASE_EXPECT_IBF != op->state->phase)
1838  {
1839  GNUNET_break_op(0);
1841  return;
1842  }
1843 
1844  // FIXME: we need to check that our set is larger than the
1845  // byzantine_lower_bound by some threshold
1846  send_full_set(op);
1848 }
1849 
1850 
1857 void
1859  const struct GNUNET_MessageHeader *mh)
1860 {
1861  struct Operation *op = cls;
1862 
1863  switch (op->state->phase)
1864  {
1865  case PHASE_EXPECT_IBF:
1866  {
1867  struct GNUNET_MQ_Envelope *ev;
1868 
1870  "got FULL DONE, sending elements that other peer is missing\n");
1871 
1872  /* send all the elements that did not come from the remote peer */
1875  op);
1876 
1878  GNUNET_MQ_send(op->mq,
1879  ev);
1880  op->state->phase = PHASE_DONE;
1881  /* we now wait until the other peer sends us the OVER message*/
1882  }
1883  break;
1884 
1885  case PHASE_FULL_SENDING:
1886  {
1888  "got FULL DONE, finishing\n");
1889  /* We sent the full set, and got the response for that. We're done. */
1890  op->state->phase = PHASE_DONE;
1892  send_client_done(op);
1894  return;
1895  }
1896  break;
1897 
1898  default:
1900  "Handle full done phase is %u\n",
1901  (unsigned)op->state->phase);
1902  GNUNET_break_op(0);
1904  return;
1905  }
1907 }
1908 
1909 
1918 int
1920  const struct GNUNET_MessageHeader *mh)
1921 {
1922  struct Operation *op = cls;
1923  unsigned int num_hashes;
1924 
1926  {
1927  GNUNET_break_op(0);
1928  return GNUNET_SYSERR;
1929  }
1930  num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
1931  / sizeof(struct GNUNET_HashCode);
1932  if ((ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
1933  != num_hashes * sizeof(struct GNUNET_HashCode))
1934  {
1935  GNUNET_break_op(0);
1936  return GNUNET_SYSERR;
1937  }
1938  return GNUNET_OK;
1939 }
1940 
1941 
1949 void
1951  const struct GNUNET_MessageHeader *mh)
1952 {
1953  struct Operation *op = cls;
1954  struct ElementEntry *ee;
1955  struct GNUNET_SET_ElementMessage *emsg;
1956  const struct GNUNET_HashCode *hash;
1957  unsigned int num_hashes;
1958  struct GNUNET_MQ_Envelope *ev;
1959 
1960  num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
1961  / sizeof(struct GNUNET_HashCode);
1962  for (hash = (const struct GNUNET_HashCode *)&mh[1];
1963  num_hashes > 0;
1964  hash++, num_hashes--)
1965  {
1967  hash);
1968  if (NULL == ee)
1969  {
1970  /* Demand for non-existing element. */
1971  GNUNET_break_op(0);
1973  return;
1974  }
1975  if (GNUNET_NO == _GSS_is_element_of_operation(ee, op))
1976  {
1977  /* Probably confused lazily copied sets. */
1978  GNUNET_break_op(0);
1980  return;
1981  }
1983  GNUNET_memcpy(&emsg[1], ee->element.data, ee->element.size);
1984  emsg->reserved = htons(0);
1985  emsg->element_type = htons(ee->element.element_type);
1987  "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
1988  (void *)op,
1989  (unsigned int)ee->element.size,
1990  GNUNET_h2s(&ee->element_hash));
1991  GNUNET_MQ_send(op->mq, ev);
1993  "# exchanged elements",
1994  1,
1995  GNUNET_NO);
1996 
1997  switch (op->result_mode)
1998  {
2000  /* Nothing to do. */
2001  break;
2002 
2005  break;
2006 
2007  default:
2008  /* Result mode not supported, should have been caught earlier. */
2009  GNUNET_break(0);
2010  break;
2011  }
2012  }
2014 }
2015 
2016 
2024 int
2026  const struct GNUNET_MessageHeader *mh)
2027 {
2028  struct Operation *op = cls;
2029  unsigned int num_hashes;
2030 
2032  {
2033  GNUNET_break_op(0);
2034  return GNUNET_SYSERR;
2035  }
2036  /* look up elements and send them */
2037  if ((op->state->phase != PHASE_INVENTORY_PASSIVE) &&
2039  {
2040  GNUNET_break_op(0);
2041  return GNUNET_SYSERR;
2042  }
2043  num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
2044  / sizeof(struct GNUNET_HashCode);
2045  if ((ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
2046  num_hashes * sizeof(struct GNUNET_HashCode))
2047  {
2048  GNUNET_break_op(0);
2049  return GNUNET_SYSERR;
2050  }
2051  return GNUNET_OK;
2052 }
2053 
2054 
2062 void
2064  const struct GNUNET_MessageHeader *mh)
2065 {
2066  struct Operation *op = cls;
2067  const struct GNUNET_HashCode *hash;
2068  unsigned int num_hashes;
2069 
2070  num_hashes = (ntohs(mh->size) - sizeof(struct GNUNET_MessageHeader))
2071  / sizeof(struct GNUNET_HashCode);
2072  for (hash = (const struct GNUNET_HashCode *)&mh[1];
2073  num_hashes > 0;
2074  hash++, num_hashes--)
2075  {
2076  struct ElementEntry *ee;
2077  struct GNUNET_MessageHeader *demands;
2078  struct GNUNET_MQ_Envelope *ev;
2079 
2081  hash);
2082  if (NULL != ee)
2084  continue;
2085 
2086  if (GNUNET_YES ==
2088  hash))
2089  {
2091  "Skipped sending duplicate demand\n");
2092  continue;
2093  }
2094 
2097  hash,
2098  NULL,
2100 
2102  "[OP %x] Requesting element (hash %s)\n",
2103  (void *)op, GNUNET_h2s(hash));
2104  ev = GNUNET_MQ_msg_header_extra(demands,
2105  sizeof(struct GNUNET_HashCode),
2107  GNUNET_memcpy(&demands[1],
2108  hash,
2109  sizeof(struct GNUNET_HashCode));
2110  GNUNET_MQ_send(op->mq, ev);
2111  }
2113 }
2114 
2115 
2122 void
2124  const struct GNUNET_MessageHeader *mh)
2125 {
2126  struct Operation *op = cls;
2127 
2129  {
2130  GNUNET_break_op(0);
2132  return;
2133  }
2134  switch (op->state->phase)
2135  {
2137  /* We got all requests, but still have to send our elements in response. */
2139 
2141  "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2142  /* The active peer is done sending offers
2143  * and inquiries. This means that all
2144  * our responses to that (demands and offers)
2145  * must be in flight (queued or in mesh).
2146  *
2147  * We should notify the active peer once
2148  * all our demands are satisfied, so that the active
2149  * peer can quit if we gave it everything.
2150  */
2152  maybe_finish(op);
2153  return;
2154 
2157  "got DONE (as active partner), waiting to finish\n");
2158  /* All demands of the other peer are satisfied,
2159  * and we processed all offers, thus we know
2160  * exactly what our demands must be.
2161  *
2162  * We'll close the channel
2163  * to the other peer once our demands are met.
2164  */
2167  maybe_finish(op);
2168  return;
2169 
2170  default:
2171  GNUNET_break_op(0);
2173  return;
2174  }
2175 }
2176 
2183 void
2185  const struct GNUNET_MessageHeader *mh)
2186 {
2187  send_client_done(cls);
2188 }
2189 
2190 
2198 static struct OperationState *
2200  const struct GNUNET_MessageHeader *opaque_context)
2201 {
2202  struct OperationState *state;
2203  struct GNUNET_MQ_Envelope *ev;
2204  struct OperationRequestMessage *msg;
2205 
2206  ev = GNUNET_MQ_msg_nested_mh(msg,
2208  opaque_context);
2209  if (NULL == ev)
2210  {
2211  /* the context message is too large */
2212  GNUNET_break(0);
2213  return NULL;
2214  }
2215  state = GNUNET_new(struct OperationState);
2217  GNUNET_NO);
2218  /* copy the current generation's strata estimator for this operation */
2219  state->se = strata_estimator_dup(op->set->state->se);
2220  /* we started the operation, thus we have to send the operation request */
2221  state->phase = PHASE_EXPECT_SE;
2222  state->salt_receive = state->salt_send = 42; // FIXME?????
2224  "Initiating union operation evaluation\n");
2226  "# of total union operations",
2227  1,
2228  GNUNET_NO);
2230  "# of initiated union operations",
2231  1,
2232  GNUNET_NO);
2233  msg->operation = htonl(GNUNET_SET_OPERATION_UNION);
2234  GNUNET_MQ_send(op->mq,
2235  ev);
2236 
2237  if (NULL != opaque_context)
2239  "sent op request with context message\n");
2240  else
2242  "sent op request without context message\n");
2243 
2244  op->state = state;
2247  return state;
2248 }
2249 
2250 
2257 static struct OperationState *
2259 {
2260  struct OperationState *state;
2261  const struct StrataEstimator *se;
2262  struct GNUNET_MQ_Envelope *ev;
2263  struct StrataEstimatorMessage *strata_msg;
2264  char *buf;
2265  size_t len;
2266  uint16_t type;
2267 
2269  "accepting set union operation\n");
2271  "# of accepted union operations",
2272  1,
2273  GNUNET_NO);
2275  "# of total union operations",
2276  1,
2277  GNUNET_NO);
2278 
2279  state = GNUNET_new(struct OperationState);
2280  state->se = strata_estimator_dup(op->set->state->se);
2282  GNUNET_NO);
2283  state->salt_receive = state->salt_send = 42; // FIXME?????
2284  op->state = state;
2287 
2288  /* kick off the operation */
2289  se = state->se;
2291  len = strata_estimator_write(se,
2292  buf);
2293  if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
2295  else
2297  ev = GNUNET_MQ_msg_extra(strata_msg,
2298  len,
2299  type);
2300  GNUNET_memcpy(&strata_msg[1],
2301  buf,
2302  len);
2303  GNUNET_free(buf);
2304  strata_msg->set_size
2306  GNUNET_MQ_send(op->mq,
2307  ev);
2308  state->phase = PHASE_EXPECT_IBF;
2309  return state;
2310 }
2311 
2312 
2321 static struct SetState *
2323 {
2324  struct SetState *set_state;
2325 
2327  "union set created\n");
2328  set_state = GNUNET_new(struct SetState);
2331  if (NULL == set_state->se)
2332  {
2334  "Failed to allocate strata estimator\n");
2335  GNUNET_free(set_state);
2336  return NULL;
2337  }
2338  return set_state;
2339 }
2340 
2341 
2348 static void
2349 union_add(struct SetState *set_state,
2350  struct ElementEntry *ee)
2351 {
2352  strata_estimator_insert(set_state->se,
2353  get_ibf_key(&ee->element_hash));
2354 }
2355 
2356 
2364 static void
2365 union_remove(struct SetState *set_state,
2366  struct ElementEntry *ee)
2367 {
2368  strata_estimator_remove(set_state->se,
2369  get_ibf_key(&ee->element_hash));
2370 }
2371 
2372 
2378 static void
2379 union_set_destroy(struct SetState *set_state)
2380 {
2381  if (NULL != set_state->se)
2382  {
2383  strata_estimator_destroy(set_state->se);
2384  set_state->se = NULL;
2385  }
2386  GNUNET_free(set_state);
2387 }
2388 
2389 
2396 static struct SetState *
2398 {
2399  struct SetState *new_state;
2400 
2401  GNUNET_assert((NULL != state) &&
2402  (NULL != state->se));
2403  new_state = GNUNET_new(struct SetState);
2404  new_state->se = strata_estimator_dup(state->se);
2405 
2406  return new_state;
2407 }
2408 
2409 
2415 static void
2417 {
2418  send_client_done(op);
2420  GNUNET_YES);
2421 }
2422 
2423 
2430 const struct SetVT *
2432 {
2433  static const struct SetVT union_vt = {
2435  .add = &union_add,
2436  .remove = &union_remove,
2437  .destroy_set = &union_set_destroy,
2438  .evaluate = &union_evaluate,
2439  .accept = &union_accept,
2440  .cancel = &union_op_cancel,
2441  .copy_state = &union_copy_state,
2442  .channel_death = &union_channel_death
2443  };
2444 
2445  return &union_vt;
2446 }
Context for op_get_element_iterator.
We sent the strata estimator, and expect an IBF.
uint32_t offset
Offset of the strata in the rest of the message.
Client gets only elements that have been added to the set.
struct StrataEstimator * se
The strata estimator is only generated once for each set.
State of an evaluate operation with another peer.
static struct OperationState * union_accept(struct Operation *op)
Accept an union operation request from a remote peer.
static int send_full_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Send a set element.
struct GNUNET_CONTAINER_MultiHashMap32 * key_to_element
Maps unsalted IBF-Keys to elements.
The other peer is decoding the IBF we just sent.
#define LOG(kind,...)
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS
Actual set elements.
enum GNUNET_SET_ResultMode result_mode
When are elements sent to the client, and which elements are sent?
uint32_t received_fresh
Number of elements we received from the other peer that were not in the local set yet...
struct Set * set
Set associated with the operation, NULL until the spec has been associated with a set...
unsigned int GNUNET_CONTAINER_multihashmap_size(const struct GNUNET_CONTAINER_MultiHashMap *map)
Get the number of key-value pairs in the map.
static unsigned int element_size
int received
Did we receive this element? Even if element->is_foreign is false, we might have received the element...
uint8_t reserved1
Padding, must be 0.
void handle_union_p2p_elements(void *cls, const struct GNUNET_SET_ElementMessage *emsg)
Handle an element message from a remote peer.
int _GSS_is_element_of_operation(struct ElementEntry *ee, struct Operation *op)
Is element ee part of the set used by op?
struct InvertibleBloomFilter * remote_ibf
The IBF we currently receive.
void handle_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
static struct GNUNET_IDENTITY_EgoLookup * el
EgoLookup.
#define MAX_BUCKETS_PER_MESSAGE
Number of buckets that can be transmitted in one message.
#define GNUNET_MQ_msg_nested_mh(mvar, type, mh)
Allocate a GNUNET_MQ_Envelope, and append a payload message after the given message struct...
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER
Tell the other peer which hashes match a given IBF key.
Element should be added to the result set of the remote peer, i.e.
void handle_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
struct GNUNET_MQ_Handle * mq
MQ to talk to client.
We are decoding an IBF.
struct SetState * state
Implementation-specific state.
Message sent by the service to the client to indicate an element that is removed (set intersection) o...
Definition: set.h:238
uint32_t salt
Salt currently used for BF construction (by us or the other peer, depending on where we are in the co...
static int op_get_element_iterator(void *cls, uint32_t key, void *value)
Iterator over the mapping from IBF keys to element entries.
Element stored in a set.
struct GNUNET_HashCode element_hash
Hash of the element.
static unsigned int get_order_from_difference(unsigned int diff)
Compute the necessary order of an ibf from the size of the symmetric set difference.
int GNUNET_CONTAINER_multihashmap32_iterate(struct GNUNET_CONTAINER_MultiHashMap32 *map, GNUNET_CONTAINER_MulitHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map.
struct StrataEstimator * strata_estimator_create(unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum)
Create a new strata estimator with the given parameters.
Message containing buckets of an invertible bloom filter.
Element should be added to the result set of the local peer, i.e.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
static int prepare_ibf(struct Operation *op, uint32_t size)
Create an ibf with the operation&#39;s elements of the specified size.
static struct GNUNET_CADET_Handle * mh
Cadet handle.
Definition: gnunet-cadet.c:92
struct GNUNET_CADET_Channel * channel
Channel to the peer.
int check_union_p2p_elements(void *cls, const struct GNUNET_SET_ElementMessage *emsg)
Check an element message from a remote peer.
Invertible bloom filter (IBF).
Definition: ibf.h:79
struct ElementEntry * element
The actual element associated with the key.
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_ADD or GNUNET_MESSAGE_TYPE_SET_REMOVE.
Definition: set.h:278
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE(C)
#define GNUNET_MESSAGE_TYPE_SET_RESULT
Create an empty set.
uint32_t request_id
id the result belongs to
Definition: set.h:252
UnionOperationPhase
Current phase we are in for a union operation.
void handle_union_p2p_full_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a "full done" message.
int check_union_p2p_full_element(void *cls, const struct GNUNET_SET_ElementMessage *emsg)
Check a full element message from a remote peer.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:67
static int send_missing_full_elements_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries...
The protocol is almost finished, but we still have to flush our message queue and/or expect some elem...
#define GNUNET_NO
Definition: gnunet_common.h:78
void _GSS_operation_destroy(struct Operation *op, int gc)
Destroy the given operation.
uint64_t initial_size
Initial size of our set, just before the operation started.
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:75
void handle_union_p2p_full_element(void *cls, const struct GNUNET_SET_ElementMessage *emsg)
Handle an element message from a remote peer.
void strata_estimator_remove(struct StrataEstimator *se, struct IBF_Key key)
Remove a key from the strata estimator.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
uint64_t key_val
Definition: ibf.h:46
int GNUNET_CONTAINER_multihashmap32_get_multiple(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, GNUNET_CONTAINER_MulitHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map that match a particular key.
static void send_client_done(void *cls)
Signal to the client that the operation has finished and destroy the operation.
int client_done_sent
Did we send the client that we are done?
void handle_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Handle an IBF message from a remote peer.
void ibf_write_slice(const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf)
Write buckets from an ibf to a buffer.
Definition: ibf.c:287
uint32_t salt_send
Salt that we&#39;re using for sending IBFs.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
int GNUNET_CONTAINER_multihashmap_contains(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Check if the map contains any value under the given key (including values that are NULL)...
static void union_set_destroy(struct SetState *set_state)
Destroy a set that supports the union operation.
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
static void union_add(struct SetState *set_state, struct ElementEntry *ee)
Add the element from the given element message to the set.
static int ret
Final status code.
Definition: gnunet-arm.c:89
We sent the request message, and expect a strata estimator.
int GNUNET_CONTAINER_multihashmap32_put(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
static struct OperationState * union_evaluate(struct Operation *op, const struct GNUNET_MessageHeader *opaque_context)
Initiate operation to evaluate a set union with a remote peer.
Success, all elements have been sent (and received).
const struct SetVT * _GSS_union_vt()
Get the table with implementing functions for set union.
Internal representation of the hash map.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur...
void GNUNET_SET_element_hash(const struct GNUNET_SET_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition: set_api.c:1241
uint16_t element_type
Type of the element to add or remove.
Definition: set.h:283
static struct GNUNET_DNSSTUB_Context * ctx
Context for DNS resolution.
const void * data
Actual data of the element.
A handle to a strata estimator.
struct StrataEstimator * strata_estimator_dup(struct StrataEstimator *se)
Make a copy of a strata estimator.
void handle_union_p2p_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a done message from a remote peer.
static void salt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
FIXME.
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
struct GNUNET_STATISTICS_Handle * _GSS_statistics
Statistics handle.
static int destroy_key_to_element_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries...
The key entry is used to associate an ibf key with an element.
uint64_t current_size
Current set size.
Definition: set.h:247
struct InvertibleBloomFilter * ibf_dup(const struct InvertibleBloomFilter *ibf)
Create a copy of an IBF, the copy has to be destroyed properly.
Definition: ibf.c:371
static int send_ibf(struct Operation *op, uint16_t ibf_order)
Send an ibf of appropriate size.
static int prepare_ibf_iterator(void *cls, uint32_t key, void *value)
Insert a key into an ibf.
uint32_t received_total
Total number of elements received from the other peer.
uint32_t size
How many cells does this IBF have?
Definition: ibf.h:83
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct...
Definition: gnunet_mq_lib.h:52
static struct SetState * union_set_create(void)
Create a new set supporting the union operation.
uint32_t salt_receive
Salt for the IBF we&#39;ve received and that we&#39;re currently decoding.
In the penultimate phase, we wait until all our demands are satisfied.
struct StrataEstimator * se
Copy of the set&#39;s strata estimator at the time of creation of this operation.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
#define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST
Request a set operation from a remote peer.
void handle_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
enum State state
current state of profiling
, &#39; bother checking if a value already exists (faster than GNUNET_CONTAINER_MULTIHASHMAPOPTION_...
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC
Compressed strata estimator.
static char * value
Value of the record to add/remove.
Information about an element element in the set.
The other peer refused to to the operation with us, or something went wrong.
int check_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
int check_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Check offer (of struct GNUNET_HashCodes).
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
void strata_estimator_destroy(struct StrataEstimator *se)
Destroy a strata estimator, free all of its resources.
Everything went ok, we are transmitting an element of the result (in set, or to be removed from set...
Dispatch table for a specific set operation.
int remote
GNUNET_YES if the element is a remote element, and does not belong to the operation&#39;s set...
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
static int decode_and_send(struct Operation *op)
Decode which elements are missing on each side, and send the appropriate offers and inquiries...
static void union_op_cancel(struct Operation *op)
Destroy the union operation.
int ibf_decode(struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *ret_id)
Decode and remove an element from the IBF, if possible.
Definition: ibf.c:225
uint8_t order
Order of the whole ibf, where num_buckets = 2^order.
struct IBF_Key ibf_key
IBF key for the entry, derived from the current salt.
struct KeyEntry * k
FIXME.
enum IntersectionOperationPhase phase
Current state of the operation.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE
Strata estimator.
uint16_t result_status
Was the evaluation successful? Contains an enum GNUNET_SET_Status in NBO.
Definition: set.h:258
#define IBF_BUCKET_SIZE
Size of one ibf bucket in bytes.
Definition: ibf.h:69
unsigned int GNUNET_CONTAINER_multihashmap32_size(const struct GNUNET_CONTAINER_MultiHashMap32 *map)
Get the number of key-value pairs in the map.
uint16_t status
See PRISM_STATUS_*-constants.
static char buf[2048]
int GNUNET_CONTAINER_multihashmap_remove(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, const void *value)
Remove the given key-value pair from the map.
#define MAX_IBF_ORDER
The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE
Set operation is done.
void ibf_subtract(struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2)
Subtract ibf2 from ibf1, storing the result in ibf1.
Definition: ibf.c:349
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF.
unsigned int strata_count
Size of the IBF array in strata.
void strata_estimator_insert(struct StrataEstimator *se, struct IBF_Key key)
Add a key to the strata estimator.
size_t strata_estimator_write(const struct StrataEstimator *se, void *buf)
Write the given strata estimator to the buffer.
Internal representation of the hash map.
A 512-bit hashcode.
Client gets notified of the required changes for both the local and the remote set.
static void send_offers_for_key(struct Operation *op, struct IBF_Key ibf_key)
Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
static int res
static void fail_union_operation(struct Operation *op)
Inform the client that the union operation has failed, and proceed to destroy the evaluate operation...
struct GNUNET_CONTAINER_MultiHashMap32 * GNUNET_CONTAINER_multihashmap32_create(unsigned int len)
Create a 32-bit key multi hash map.
static void op_register_element(struct Operation *op, struct ElementEntry *ee, int received)
Insert an element into the union operation&#39;s key-to-element mapping.
void GNUNET_CONTAINER_multihashmap32_destroy(struct GNUNET_CONTAINER_MultiHashMap32 *map)
Destroy a 32-bit key hash map.
static int send_offers_iterator(void *cls, uint32_t key, void *value)
Iterator to send elements to a remote peer.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL
Demand the whole element from the other peer, given only the hash code.
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *mqm)
Discard the message queue message, free all allocated resources.
Definition: mq.c:319
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:35
uint16_t element_type
Type of the element attachted to the message, if any.
Definition: set.h:263
void ibf_read_slice(const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf)
Read buckets from a buffer into an ibf.
Definition: ibf.c:318
unsigned int ibf_buckets_received
Number of ibf buckets already received into the remote_ibf.
struct GNUNET_HashCode key
The key used in the DHT.
int strata_estimator_read(const void *buf, size_t buf_len, int is_compressed, struct StrataEstimator *se)
Read strata from the buffer into the given strata estimator.
#define GNUNET_SYSERR
Definition: gnunet_common.h:76
invertible bloom filter
static unsigned int size
Size of the "table".
Definition: peer.c:66
static void union_channel_death(struct Operation *op)
Handle case where channel went down for an operation.
void handle_union_p2p_over(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a over message from a remote peer.
#define GNUNET_MQ_msg_header_extra(mh, esize, type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header and extra space...
Definition: gnunet_mq_lib.h:88
int byzantine
GNUNET_YES to fail operations where Byzantine faults are suspected
void ibf_destroy(struct InvertibleBloomFilter *ibf)
Destroy all resources associated with the invertible bloom filter.
Definition: ibf.c:392
const char * name
char * getenv()
Strata estimator together with the peer&#39;s overall set size.
struct InvertibleBloomFilter * ibf_create(uint32_t size, uint8_t hash_num)
Create an invertible bloom filter.
Definition: ibf.c:76
static struct KeyEntry * op_get_element(struct Operation *op, const struct GNUNET_HashCode *element_hash)
Determine whether the given element is already in the operation&#39;s element set.
unsigned int strata_estimator_difference(const struct StrataEstimator *se1, const struct StrataEstimator *se2)
Estimate set difference with two strata estimators, i.e.
enum GNUNET_SET_OperationType operation
Type of operation supported for this set.
Operation context used to execute a set operation.
int GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
int check_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Check an IBF message from a remote peer.
void handle_union_p2p_request_full(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a request for full set transmission.
static int init_key_to_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for initializing the key-to-element mapping of a union operation.
Extra state required for efficient set intersection.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF
Invertible bloom filter.
uint16_t reserved2
Padding, must be 0.
struct InvertibleBloomFilter * local_ibf
The IBF with the local set&#39;s element.
Allow multiple values with the same key.
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF.
void ibf_insert(struct InvertibleBloomFilter *ibf, struct IBF_Key key)
Insert a key into an IBF.
Definition: ibf.c:164
#define GNUNET_MQ_msg_header(type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
Definition: gnunet_mq_lib.h:76
static void send_client_element(struct Operation *op, struct GNUNET_SET_Element *element, int status)
Send a result message to the client indicating that there is a new element.
static void initialize_key_to_element(struct Operation *op)
Initialize the IBF key to element mapping local to this set operation.
void GNUNET_CADET_receive_done(struct GNUNET_CADET_Channel *channel)
Send an ack on the channel to confirm the processing of a message.
Definition: cadet_api.c:951
static void unsalt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
FIXME.
uint16_t reserved
For alignment, always zero.
Definition: set.h:288
static void maybe_finish(struct Operation *op)
Tests if the operation is finished, and if so notify.
uint32_t salt
Salt used when hashing elements for this IBF.
struct GNUNET_MQ_Handle * mq
Message queue for the channel.
uint32_t client_request_id
ID used to identify an operation between service and client.
#define IBF_ALPHA
Number of buckets used in the ibf per estimated difference.
#define SE_IBF_SIZE
Size of the IBFs in the strata estimator.
int GNUNET_CRYPTO_hash_cmp(const struct GNUNET_HashCode *h1, const struct GNUNET_HashCode *h2)
Compare function for HashCodes, producing a total ordering of all hashcodes.
Definition: crypto_hash.c:278
#define SE_IBF_HASH_NUM
The hash num parameter for the difference digests and strata estimators.
uint16_t size
Number of bytes in the buffer pointed to by data.
common components for the implementation the different set operations
Continuation for multi part IBFs.
#define GNUNET_log(kind,...)
two-peer set operations
Message sent by client to the service to add or remove an element to/from the set.
Definition: set.h:273
struct GNUNET_SET_Element element
The actual element.
uint32_t operation
Operation to request, values from enum GNUNET_SET_OperationType
struct GNUNET_CONTAINER_MultiHashMap * elements
Maps struct GNUNET_HashCode * to struct ElementEntry *.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE
Request all missing elements from the other peer, based on their sets and the elements we previously ...
struct Operation * op
Operation for which the elements should be sent.
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
void _GSS_operation_destroy2(struct Operation *op)
This function probably should not exist and be replaced by inlining more specific logic in the variou...
Header for all communications.
#define GNUNET_YES
Definition: gnunet_common.h:77
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT
Send a set element, not as response to a demand but because we&#39;re sending the full set...
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:351
struct ClientState * cs
Client that owns the set.
static void send_full_set(struct Operation *op)
Switch to full set transmission for op.
void handle_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Handle offers (of struct GNUNET_HashCodes) and respond with demands (of struct GNUNET_HashCodes).
struct GNUNET_CONTAINER_MultiHashMap * demanded_hashes
Hashes for elements that we have demanded from the other peer.
int check_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Check a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
SetCreateImpl create
Callback for the set creation.
static struct IBF_Key get_ibf_key(const struct GNUNET_HashCode *src)
Derive the IBF key from a hash code and a salt.
struct OperationState * state
Operation-specific operation state.
static struct GNUNET_ARM_Operation * op
Current operation.
Definition: gnunet-arm.c:139
In the ultimate phase, we wait until our demands are satisfied and then quit (sending another DONE me...
Keys that can be inserted into and removed from an IBF.
Definition: ibf.h:45
uint32_t salt
Salt used when hashing elements for this inquiry.
#define SE_STRATA_COUNT
Number of IBFs in a strata estimator.
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MulitHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
Set union, return all elements that are in at least one of the sets.
static struct SetState * union_copy_state(struct SetState *state)
Copy union-specific set state.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY
Tell the other peer to send us a list of hashes that match an IBF key.
int check_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
int GNUNET_CRYPTO_kdf(void *result, size_t out_len, const void *xts, size_t xts_len, const void *skm, size_t skm_len,...)
Derive key.
Definition: crypto_kdf.c:91
static void union_remove(struct SetState *set_state, struct ElementEntry *ee)
Remove the element given in the element message from the set.
Used as a closure for sending elements with a specific IBF key.
Peer-to-Peer messages for gnunet set.
struct GNUNET_HashCode hash
FIXME.
After sending the full set, wait for responses with the elements that the local peer is missing...
struct IBF_Key ibf_key
The IBF key whose matching elements should be sent.
#define GNUNET_malloc(size)
Wrapper around malloc.
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:48
#define GNUNET_free(ptr)
Wrapper around free.
uint16_t element_type
Application-specific element type.
#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND
Demand the whole element from the other peer, given only the hash code.
uint16_t len
length of data (which is always a uint32_t, but presumably this can be used to specify that fewer byt...
struct SetContent * content
Content, possibly shared by multiple sets, and thus reference counted.
int force_full
Always send full sets, even if delta operations would be more efficient.
unsigned int ibf_size
Size of each IBF stratum (in bytes)