GNUnet  0.11.x
gnunet-service-setu.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2013-2017, 2020 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
29 #include "ibf.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_cadet_service.h"
36 #include <gcrypt.h>
37 #include "gnunet_setu_service.h"
38 #include "setu.h"
39 
40 #define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
41 
46 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
47 
51 #define SE_STRATA_COUNT 32
52 
56 #define SE_IBF_SIZE 80
57 
61 #define SE_IBF_HASH_NUM 4
62 
66 #define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
67 
73 #define MAX_IBF_ORDER (20)
74 
79 #define IBF_ALPHA 4
80 
81 
86 {
91 
102 
107 
112 
117 
123 
129 
135 
141 
147 };
148 
149 
156 struct ElementEntry
157 {
163 
169 
173  unsigned int generation;
174 
179  int remote;
180 };
181 
182 
187 struct Listener;
188 
189 
193 struct Set;
194 
195 
199 struct ClientState
200 {
204  struct Set *set;
205 
209  struct Listener *listener;
210 
214  struct GNUNET_SERVICE_Client *client;
215 
219  struct GNUNET_MQ_Handle *mq;
220 };
221 
222 
226 struct Operation
227 {
228 
233  struct GNUNET_PeerIdentity peer;
234 
238  uint64_t initial_size;
239 
243  struct Operation *next;
244 
248  struct Operation *prev;
249 
253  struct GNUNET_CADET_Channel *channel;
254 
258  struct Listener *listener;
259 
263  struct GNUNET_MQ_Handle *mq;
264 
268  struct GNUNET_MessageHeader *context_msg;
269 
274  struct Set *set;
275 
281 
286 
291 
298 
304 
309 
314 
318  int client_done_sent;
319 
323  unsigned int ibf_buckets_received;
324 
328  uint32_t salt_send;
329 
333  uint32_t salt_receive;
334 
339  uint32_t received_fresh;
340 
344  uint32_t received_total;
345 
349  uint32_t salt;
350 
354  uint32_t remote_element_count;
355 
359  uint32_t client_request_id;
360 
365  int force_delta;
366 
371  int force_full;
372 
377  int byzantine;
378 
384 
389  int byzantine_lower_bound;
390 
396  uint32_t suggest_id;
397 
402  unsigned int generation_created;
403 };
404 
405 
410 struct SetContent
411 {
415  struct GNUNET_CONTAINER_MultiHashMap *elements;
416 
420  unsigned int refcount;
421 
425  unsigned int latest_generation;
426 
430  int iterator_count;
431 };
432 
433 
437 struct Set
438 {
442  struct Set *next;
443 
447  struct Set *prev;
448 
453  struct ClientState *cs;
454 
459  struct SetContent *content;
460 
466 
470  struct Operation *ops_head;
471 
475  struct Operation *ops_tail;
476 
481  unsigned int current_generation;
482 
483 };
484 
485 
489 struct KeyEntry
490 {
494  struct IBF_Key ibf_key;
495 
502  struct ElementEntry *element;
503 
509  int received;
510 };
511 
512 
517 struct SendElementClosure
518 {
523  struct IBF_Key ibf_key;
524 
529  struct Operation *op;
530 };
531 
532 
537 struct Listener
538 {
542  struct Listener *next;
543 
547  struct Listener *prev;
548 
554  struct Operation *op_head;
555 
561  struct Operation *op_tail;
562 
567  struct ClientState *cs;
568 
572  struct GNUNET_CADET_Port *open_port;
573 
578  struct GNUNET_HashCode app_id;
579 
580 };
581 
582 
587 static struct GNUNET_CADET_Handle *cadet;
588 
593 
597 static struct Listener *listener_head;
598 
602 static struct Listener *listener_tail;
603 
607 static unsigned int num_clients;
608 
613 static int in_shutdown;
614 
620 static uint32_t suggest_id;
621 
622 
633 static int
635  uint32_t key,
636  void *value)
637 {
638  struct KeyEntry *k = value;
639 
640  GNUNET_assert (NULL != k);
641  if (GNUNET_YES == k->element->remote)
642  {
643  GNUNET_free (k->element);
644  k->element = NULL;
645  }
646  GNUNET_free (k);
647  return GNUNET_YES;
648 }
649 
650 
657 static void
658 send_client_done (void *cls)
659 {
660  struct Operation *op = cls;
661  struct GNUNET_MQ_Envelope *ev;
662  struct GNUNET_SETU_ResultMessage *rm;
663 
664  if (GNUNET_YES == op->client_done_sent)
665  return;
666  if (PHASE_FINISHED != op->phase)
667  {
669  "Union operation failed\n");
670  GNUNET_STATISTICS_update (_GSS_statistics,
671  "# Union operations failed",
672  1,
673  GNUNET_NO);
676  rm->request_id = htonl (op->client_request_id);
677  rm->element_type = htons (0);
678  GNUNET_MQ_send (op->set->cs->mq,
679  ev);
680  return;
681  }
682 
684 
685  GNUNET_STATISTICS_update (_GSS_statistics,
686  "# Union operations succeeded",
687  1,
688  GNUNET_NO);
690  "Signalling client that union operation is done\n");
691  ev = GNUNET_MQ_msg (rm,
693  rm->request_id = htonl (op->client_request_id);
695  rm->element_type = htons (0);
697  op->key_to_element));
698  GNUNET_MQ_send (op->set->cs->mq,
699  ev);
700 }
701 
702 
703 /* FIXME: the destroy logic is a mess and should be cleaned up! */
704 
717 static void
719 {
720  struct Set *set = op->set;
721  struct GNUNET_CADET_Channel *channel;
722 
724  "Destroying union operation %p\n",
725  op);
726  GNUNET_assert (NULL == op->listener);
727  /* check if the op was canceled twice */
728  if (NULL != op->remote_ibf)
729  {
730  ibf_destroy (op->remote_ibf);
731  op->remote_ibf = NULL;
732  }
733  if (NULL != op->demanded_hashes)
734  {
736  op->demanded_hashes = NULL;
737  }
738  if (NULL != op->local_ibf)
739  {
740  ibf_destroy (op->local_ibf);
741  op->local_ibf = NULL;
742  }
743  if (NULL != op->se)
744  {
746  op->se = NULL;
747  }
748  if (NULL != op->key_to_element)
749  {
752  NULL);
754  op->key_to_element = NULL;
755  }
756  if (NULL != set)
757  {
758  GNUNET_CONTAINER_DLL_remove (set->ops_head,
759  set->ops_tail,
760  op);
761  op->set = NULL;
762  }
763  if (NULL != op->context_msg)
764  {
765  GNUNET_free (op->context_msg);
766  op->context_msg = NULL;
767  }
768  if (NULL != (channel = op->channel))
769  {
770  /* This will free op; called conditionally as this helper function
771  is also called from within the channel disconnect handler. */
772  op->channel = NULL;
774  }
775  /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
776  * there was a channel end handler that will free 'op' on the call stack. */
777 }
778 
779 
785 static void
787 
788 
794 static void
796 {
797  struct Listener *listener;
798 
800  "Destroying incoming operation %p\n",
801  op);
802  if (NULL != (listener = op->listener))
803  {
805  listener->op_tail,
806  op);
807  op->listener = NULL;
808  }
809  if (NULL != op->timeout_task)
810  {
812  op->timeout_task = NULL;
813  }
815 }
816 
817 
823 static void
825 {
826  struct GNUNET_CADET_Channel *channel;
827 
828  if (NULL != (channel = op->channel))
829  {
830  /* This will free op; called conditionally as this helper function
831  is also called from within the channel disconnect handler. */
832  op->channel = NULL;
834  }
835  if (NULL != op->listener)
836  {
837  incoming_destroy (op);
838  return;
839  }
840  if (NULL != op->set)
841  send_client_done (op);
843  GNUNET_free (op);
844 }
845 
846 
853 static void
855 {
856  struct GNUNET_MQ_Envelope *ev;
858 
860  "union operation failed\n");
863  msg->request_id = htonl (op->client_request_id);
864  msg->element_type = htons (0);
865  GNUNET_MQ_send (op->set->cs->mq,
866  ev);
868 }
869 
870 
878 static struct IBF_Key
879 get_ibf_key (const struct GNUNET_HashCode *src)
880 {
881  struct IBF_Key key;
882  uint16_t salt = 0;
883 
885  GNUNET_CRYPTO_kdf (&key, sizeof(key),
886  src, sizeof *src,
887  &salt, sizeof(salt),
888  NULL, 0));
889  return key;
890 }
891 
892 
896 struct GetElementContext
897 {
901  struct GNUNET_HashCode hash;
902 
906  struct KeyEntry *k;
907 };
908 
909 
920 static int
922  uint32_t key,
923  void *value)
924 {
925  struct GetElementContext *ctx = cls;
926  struct KeyEntry *k = value;
927 
928  GNUNET_assert (NULL != k);
930  &ctx->hash))
931  {
932  ctx->k = k;
933  return GNUNET_NO;
934  }
935  return GNUNET_YES;
936 }
937 
938 
947 static struct KeyEntry *
949  const struct GNUNET_HashCode *element_hash)
950 {
951  int ret;
952  struct IBF_Key ibf_key;
953  struct GetElementContext ctx = { { { 0 } }, 0 };
954 
955  ctx.hash = *element_hash;
956 
957  ibf_key = get_ibf_key (element_hash);
959  (uint32_t) ibf_key.key_val,
961  &ctx);
962 
963  /* was the iteration aborted because we found the element? */
964  if (GNUNET_SYSERR == ret)
965  {
966  GNUNET_assert (NULL != ctx.k);
967  return ctx.k;
968  }
969  return NULL;
970 }
971 
972 
987 static void
989  struct ElementEntry *ee,
990  int received)
991 {
992  struct IBF_Key ibf_key;
993  struct KeyEntry *k;
994 
995  ibf_key = get_ibf_key (&ee->element_hash);
996  k = GNUNET_new (struct KeyEntry);
997  k->element = ee;
998  k->ibf_key = ibf_key;
999  k->received = received;
1002  (uint32_t) ibf_key.key_val,
1003  k,
1005 }
1006 
1007 
1012 static void
1013 salt_key (const struct IBF_Key *k_in,
1014  uint32_t salt,
1015  struct IBF_Key *k_out)
1016 {
1017  int s = salt % 64;
1018  uint64_t x = k_in->key_val;
1019 
1020  /* rotate ibf key */
1021  x = (x >> s) | (x << (64 - s));
1022  k_out->key_val = x;
1023 }
1024 
1025 
1029 static void
1030 unsalt_key (const struct IBF_Key *k_in,
1031  uint32_t salt,
1032  struct IBF_Key *k_out)
1033 {
1034  int s = salt % 64;
1035  uint64_t x = k_in->key_val;
1036 
1037  x = (x << s) | (x >> (64 - s));
1038  k_out->key_val = x;
1039 }
1040 
1041 
1049 static int
1051  uint32_t key,
1052  void *value)
1053 {
1054  struct Operation *op = cls;
1055  struct KeyEntry *ke = value;
1056  struct IBF_Key salted_key;
1057 
1059  "[OP %p] inserting %lx (hash %s) into ibf\n",
1060  op,
1061  (unsigned long) ke->ibf_key.key_val,
1062  GNUNET_h2s (&ke->element->element_hash));
1063  salt_key (&ke->ibf_key,
1064  op->salt_send,
1065  &salted_key);
1066  ibf_insert (op->local_ibf, salted_key);
1067  return GNUNET_YES;
1068 }
1069 
1070 
1078 static int
1080  struct Operation *op)
1081 {
1082  return ee->generation >= op->generation_created;
1083 }
1084 
1085 
1096 static int
1098  const struct GNUNET_HashCode *key,
1099  void *value)
1100 {
1101  struct Operation *op = cls;
1102  struct ElementEntry *ee = value;
1103 
1104  /* make sure that the element belongs to the set at the time
1105  * of creating the operation */
1106  if (GNUNET_NO ==
1108  op))
1109  return GNUNET_YES;
1110  GNUNET_assert (GNUNET_NO == ee->remote);
1111  op_register_element (op,
1112  ee,
1113  GNUNET_NO);
1114  return GNUNET_YES;
1115 }
1116 
1117 
1123 static void
1125 {
1126  unsigned int len;
1127 
1128  GNUNET_assert (NULL == op->key_to_element);
1133  op);
1134 }
1135 
1136 
1145 static int
1147  uint32_t size)
1148 {
1149  GNUNET_assert (NULL != op->key_to_element);
1150 
1151  if (NULL != op->local_ibf)
1152  ibf_destroy (op->local_ibf);
1153  op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
1154  if (NULL == op->local_ibf)
1155  {
1157  "Failed to allocate local IBF\n");
1158  return GNUNET_SYSERR;
1159  }
1162  op);
1163  return GNUNET_OK;
1164 }
1165 
1166 
1176 static int
1177 send_ibf (struct Operation *op,
1178  uint16_t ibf_order)
1179 {
1180  unsigned int buckets_sent = 0;
1181  struct InvertibleBloomFilter *ibf;
1182 
1183  if (GNUNET_OK !=
1184  prepare_ibf (op, 1 << ibf_order))
1185  {
1186  /* allocation failed */
1187  return GNUNET_SYSERR;
1188  }
1189 
1191  "sending ibf of size %u\n",
1192  1 << ibf_order);
1193 
1194  {
1195  char name[64] = { 0 };
1196  snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order);
1197  GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
1198  }
1199 
1200  ibf = op->local_ibf;
1201 
1202  while (buckets_sent < (1 << ibf_order))
1203  {
1204  unsigned int buckets_in_message;
1205  struct GNUNET_MQ_Envelope *ev;
1206  struct IBFMessage *msg;
1207 
1208  buckets_in_message = (1 << ibf_order) - buckets_sent;
1209  /* limit to maximum */
1210  if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
1211  buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
1212 
1213  ev = GNUNET_MQ_msg_extra (msg,
1214  buckets_in_message * IBF_BUCKET_SIZE,
1216  msg->reserved1 = 0;
1217  msg->reserved2 = 0;
1218  msg->order = ibf_order;
1219  msg->offset = htonl (buckets_sent);
1220  msg->salt = htonl (op->salt_send);
1221  ibf_write_slice (ibf, buckets_sent,
1222  buckets_in_message, &msg[1]);
1223  buckets_sent += buckets_in_message;
1225  "ibf chunk size %u, %u/%u sent\n",
1226  buckets_in_message,
1227  buckets_sent,
1228  1 << ibf_order);
1229  GNUNET_MQ_send (op->mq, ev);
1230  }
1231 
1232  /* The other peer must decode the IBF, so
1233  * we're passive. */
1235  return GNUNET_OK;
1236 }
1237 
1238 
1246 static unsigned int
1247 get_order_from_difference (unsigned int diff)
1248 {
1249  unsigned int ibf_order;
1250 
1251  ibf_order = 2;
1252  while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
1253  ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
1254  (ibf_order < MAX_IBF_ORDER))
1255  ibf_order++;
1256  // add one for correction
1257  return ibf_order + 1;
1258 }
1259 
1260 
1270 static int
1272  const struct GNUNET_HashCode *key,
1273  void *value)
1274 {
1275  struct Operation *op = cls;
1276  struct GNUNET_SETU_ElementMessage *emsg;
1277  struct ElementEntry *ee = value;
1278  struct GNUNET_SETU_Element *el = &ee->element;
1279  struct GNUNET_MQ_Envelope *ev;
1280 
1282  "Sending element %s\n",
1283  GNUNET_h2s (key));
1284  ev = GNUNET_MQ_msg_extra (emsg,
1285  el->size,
1287  emsg->element_type = htons (el->element_type);
1288  GNUNET_memcpy (&emsg[1],
1289  el->data,
1290  el->size);
1291  GNUNET_MQ_send (op->mq,
1292  ev);
1293  return GNUNET_YES;
1294 }
1295 
1296 
1302 static void
1304 {
1305  struct GNUNET_MQ_Envelope *ev;
1306 
1307  op->phase = PHASE_FULL_SENDING;
1309  "Dedicing to transmit the full set\n");
1310  /* FIXME: use a more memory-friendly way of doing this with an
1311  iterator, just as we do in the non-full case! */
1314  op);
1316  GNUNET_MQ_send (op->mq,
1317  ev);
1318 }
1319 
1320 
1327 static int
1329  const struct StrataEstimatorMessage *msg)
1330 {
1331  struct Operation *op = cls;
1332  int is_compressed;
1333  size_t len;
1334 
1335  if (op->phase != PHASE_EXPECT_SE)
1336  {
1337  GNUNET_break (0);
1338  return GNUNET_SYSERR;
1339  }
1340  is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
1341  msg->header.type));
1342  len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1343  if ((GNUNET_NO == is_compressed) &&
1345  {
1346  GNUNET_break (0);
1347  return GNUNET_SYSERR;
1348  }
1349  return GNUNET_OK;
1350 }
1351 
1352 
1359 static void
1361  const struct StrataEstimatorMessage *msg)
1362 {
1363  struct Operation *op = cls;
1364  struct StrataEstimator *remote_se;
1365  unsigned int diff;
1366  uint64_t other_size;
1367  size_t len;
1368  int is_compressed;
1369 
1370  is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
1371  msg->header.type));
1372  GNUNET_STATISTICS_update (_GSS_statistics,
1373  "# bytes of SE received",
1374  ntohs (msg->header.size),
1375  GNUNET_NO);
1376  len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1377  other_size = GNUNET_ntohll (msg->set_size);
1379  SE_IBF_SIZE,
1380  SE_IBF_HASH_NUM);
1381  if (NULL == remote_se)
1382  {
1383  /* insufficient resources, fail */
1384  fail_union_operation (op);
1385  return;
1386  }
1387  if (GNUNET_OK !=
1388  strata_estimator_read (&msg[1],
1389  len,
1390  is_compressed,
1391  remote_se))
1392  {
1393  /* decompression failed */
1394  strata_estimator_destroy (remote_se);
1395  fail_union_operation (op);
1396  return;
1397  }
1398  GNUNET_assert (NULL != op->se);
1399  diff = strata_estimator_difference (remote_se,
1400  op->se);
1401 
1402  if (diff > 200)
1403  diff = diff * 3 / 2;
1404 
1405  strata_estimator_destroy (remote_se);
1407  op->se = NULL;
1409  "got se diff=%d, using ibf size %d\n",
1410  diff,
1411  1U << get_order_from_difference (diff));
1412 
1413  {
1414  char *set_debug;
1415 
1416  set_debug = getenv ("GNUNET_SETU_BENCHMARK");
1417  if ((NULL != set_debug) &&
1418  (0 == strcmp (set_debug, "1")))
1419  {
1420  FILE *f = fopen ("set.log", "a");
1421  fprintf (f, "%llu\n", (unsigned long long) diff);
1422  fclose (f);
1423  }
1424  }
1425 
1426  if ((GNUNET_YES == op->byzantine) &&
1427  (other_size < op->byzantine_lower_bound))
1428  {
1429  GNUNET_break (0);
1430  fail_union_operation (op);
1431  return;
1432  }
1433 
1434  if ((GNUNET_YES == op->force_full) ||
1435  (diff > op->initial_size / 4) ||
1436  (0 == other_size))
1437  {
1439  "Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
1440  diff,
1441  (unsigned long long) op->initial_size);
1442  GNUNET_STATISTICS_update (_GSS_statistics,
1443  "# of full sends",
1444  1,
1445  GNUNET_NO);
1446  if ((op->initial_size <= other_size) ||
1447  (0 == other_size))
1448  {
1449  send_full_set (op);
1450  }
1451  else
1452  {
1453  struct GNUNET_MQ_Envelope *ev;
1454 
1456  "Telling other peer that we expect its full set\n");
1458  ev = GNUNET_MQ_msg_header (
1460  GNUNET_MQ_send (op->mq,
1461  ev);
1462  }
1463  }
1464  else
1465  {
1466  GNUNET_STATISTICS_update (_GSS_statistics,
1467  "# of ibf sends",
1468  1,
1469  GNUNET_NO);
1470  if (GNUNET_OK !=
1471  send_ibf (op,
1472  get_order_from_difference (diff)))
1473  {
1474  /* Internal error, best we can do is shut the connection */
1476  "Failed to send IBF, closing connection\n");
1477  fail_union_operation (op);
1478  return;
1479  }
1480  }
1482 }
1483 
1484 
1492 static int
1494  uint32_t key,
1495  void *value)
1496 {
1497  struct SendElementClosure *sec = cls;
1498  struct Operation *op = sec->op;
1499  struct KeyEntry *ke = value;
1500  struct GNUNET_MQ_Envelope *ev;
1501  struct GNUNET_MessageHeader *mh;
1502 
1503  /* Detect 32-bit key collision for the 64-bit IBF keys. */
1504  if (ke->ibf_key.key_val != sec->ibf_key.key_val)
1505  return GNUNET_YES;
1506 
1507  ev = GNUNET_MQ_msg_header_extra (mh,
1508  sizeof(struct GNUNET_HashCode),
1510 
1511  GNUNET_assert (NULL != ev);
1512  *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1514  "[OP %p] sending element offer (%s) to peer\n",
1515  op,
1516  GNUNET_h2s (&ke->element->element_hash));
1517  GNUNET_MQ_send (op->mq, ev);
1518  return GNUNET_YES;
1519 }
1520 
1521 
1528 static void
1530  struct IBF_Key ibf_key)
1531 {
1532  struct SendElementClosure send_cls;
1533 
1534  send_cls.ibf_key = ibf_key;
1535  send_cls.op = op;
1537  op->key_to_element,
1538  (uint32_t) ibf_key.
1539  key_val,
1541  &send_cls);
1542 }
1543 
1544 
1552 static int
1554 {
1555  struct IBF_Key key;
1556  struct IBF_Key last_key;
1557  int side;
1558  unsigned int num_decoded;
1559  struct InvertibleBloomFilter *diff_ibf;
1560 
1562 
1563  if (GNUNET_OK !=
1564  prepare_ibf (op,
1565  op->remote_ibf->size))
1566  {
1567  GNUNET_break (0);
1568  /* allocation failed */
1569  return GNUNET_SYSERR;
1570  }
1571  diff_ibf = ibf_dup (op->local_ibf);
1572  ibf_subtract (diff_ibf,
1573  op->remote_ibf);
1574 
1575  ibf_destroy (op->remote_ibf);
1576  op->remote_ibf = NULL;
1577 
1579  "decoding IBF (size=%u)\n",
1580  diff_ibf->size);
1581 
1582  num_decoded = 0;
1583  key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1584 
1585  while (1)
1586  {
1587  int res;
1588  int cycle_detected = GNUNET_NO;
1589 
1590  last_key = key;
1591 
1592  res = ibf_decode (diff_ibf,
1593  &side,
1594  &key);
1595  if (res == GNUNET_OK)
1596  {
1598  "decoded ibf key %lx\n",
1599  (unsigned long) key.key_val);
1600  num_decoded += 1;
1601  if ((num_decoded > diff_ibf->size) ||
1602  ((num_decoded > 1) &&
1603  (last_key.key_val == key.key_val)))
1604  {
1606  "detected cyclic ibf (decoded %u/%u)\n",
1607  num_decoded,
1608  diff_ibf->size);
1609  cycle_detected = GNUNET_YES;
1610  }
1611  }
1612  if ((GNUNET_SYSERR == res) ||
1613  (GNUNET_YES == cycle_detected))
1614  {
1615  int next_order;
1616  next_order = 0;
1617  while (1 << next_order < diff_ibf->size)
1618  next_order++;
1619  next_order++;
1620  if (next_order <= MAX_IBF_ORDER)
1621  {
1623  "decoding failed, sending larger ibf (size %u)\n",
1624  1 << next_order);
1625  GNUNET_STATISTICS_update (_GSS_statistics,
1626  "# of IBF retries",
1627  1,
1628  GNUNET_NO);
1629  op->salt_send++;
1630  if (GNUNET_OK !=
1631  send_ibf (op, next_order))
1632  {
1633  /* Internal error, best we can do is shut the connection */
1635  "Failed to send IBF, closing connection\n");
1636  fail_union_operation (op);
1637  ibf_destroy (diff_ibf);
1638  return GNUNET_SYSERR;
1639  }
1640  }
1641  else
1642  {
1643  GNUNET_STATISTICS_update (_GSS_statistics,
1644  "# of failed union operations (too large)",
1645  1,
1646  GNUNET_NO);
1647  // XXX: Send the whole set, element-by-element
1649  "set union failed: reached ibf limit\n");
1650  fail_union_operation (op);
1651  ibf_destroy (diff_ibf);
1652  return GNUNET_SYSERR;
1653  }
1654  break;
1655  }
1656  if (GNUNET_NO == res)
1657  {
1658  struct GNUNET_MQ_Envelope *ev;
1659 
1661  "transmitted all values, sending DONE\n");
1663  GNUNET_MQ_send (op->mq, ev);
1664  /* We now wait until we get a DONE message back
1665  * and then wait for our MQ to be flushed and all our
1666  * demands be delivered. */
1667  break;
1668  }
1669  if (1 == side)
1670  {
1671  struct IBF_Key unsalted_key;
1672 
1673  unsalt_key (&key,
1674  op->salt_receive,
1675  &unsalted_key);
1676  send_offers_for_key (op,
1677  unsalted_key);
1678  }
1679  else if (-1 == side)
1680  {
1681  struct GNUNET_MQ_Envelope *ev;
1682  struct InquiryMessage *msg;
1683 
1684  /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1685  * the effort additional complexity. */
1686  ev = GNUNET_MQ_msg_extra (msg,
1687  sizeof(struct IBF_Key),
1689  msg->salt = htonl (op->salt_receive);
1690  GNUNET_memcpy (&msg[1],
1691  &key,
1692  sizeof(struct IBF_Key));
1694  "sending element inquiry for IBF key %lx\n",
1695  (unsigned long) key.key_val);
1696  GNUNET_MQ_send (op->mq, ev);
1697  }
1698  else
1699  {
1700  GNUNET_assert (0);
1701  }
1702  }
1703  ibf_destroy (diff_ibf);
1704  return GNUNET_OK;
1705 }
1706 
1707 
1718 static int
1720  const struct IBFMessage *msg)
1721 {
1722  struct Operation *op = cls;
1723  unsigned int buckets_in_message;
1724 
1725  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1726  / IBF_BUCKET_SIZE;
1727  if (0 == buckets_in_message)
1728  {
1729  GNUNET_break_op (0);
1730  return GNUNET_SYSERR;
1731  }
1732  if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
1733  * IBF_BUCKET_SIZE)
1734  {
1735  GNUNET_break_op (0);
1736  return GNUNET_SYSERR;
1737  }
1738  if (op->phase == PHASE_EXPECT_IBF_LAST)
1739  {
1740  if (ntohl (msg->offset) != op->ibf_buckets_received)
1741  {
1742  GNUNET_break_op (0);
1743  return GNUNET_SYSERR;
1744  }
1745  if (1 << msg->order != op->remote_ibf->size)
1746  {
1747  GNUNET_break_op (0);
1748  return GNUNET_SYSERR;
1749  }
1750  if (ntohl (msg->salt) != op->salt_receive)
1751  {
1752  GNUNET_break_op (0);
1753  return GNUNET_SYSERR;
1754  }
1755  }
1756  else if ((op->phase != PHASE_PASSIVE_DECODING) &&
1757  (op->phase != PHASE_EXPECT_IBF))
1758  {
1759  GNUNET_break_op (0);
1760  return GNUNET_SYSERR;
1761  }
1762 
1763  return GNUNET_OK;
1764 }
1765 
1766 
1776 static void
1778  const struct IBFMessage *msg)
1779 {
1780  struct Operation *op = cls;
1781  unsigned int buckets_in_message;
1782 
1783  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1784  / IBF_BUCKET_SIZE;
1785  if ((op->phase == PHASE_PASSIVE_DECODING) ||
1786  (op->phase == PHASE_EXPECT_IBF))
1787  {
1789  GNUNET_assert (NULL == op->remote_ibf);
1791  "Creating new ibf of size %u\n",
1792  1 << msg->order);
1793  op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
1794  op->salt_receive = ntohl (msg->salt);
1796  "Receiving new IBF with salt %u\n",
1797  op->salt_receive);
1798  if (NULL == op->remote_ibf)
1799  {
1801  "Failed to parse remote IBF, closing connection\n");
1802  fail_union_operation (op);
1803  return;
1804  }
1805  op->ibf_buckets_received = 0;
1806  if (0 != ntohl (msg->offset))
1807  {
1808  GNUNET_break_op (0);
1809  fail_union_operation (op);
1810  return;
1811  }
1812  }
1813  else
1814  {
1817  "Received more of IBF\n");
1818  }
1819  GNUNET_assert (NULL != op->remote_ibf);
1820 
1821  ibf_read_slice (&msg[1],
1823  buckets_in_message,
1824  op->remote_ibf);
1825  op->ibf_buckets_received += buckets_in_message;
1826 
1827  if (op->ibf_buckets_received == op->remote_ibf->size)
1828  {
1830  "received full ibf\n");
1832  if (GNUNET_OK !=
1833  decode_and_send (op))
1834  {
1835  /* Internal error, best we can do is shut down */
1837  "Failed to decode IBF, closing connection\n");
1838  fail_union_operation (op);
1839  return;
1840  }
1841  }
1843 }
1844 
1845 
1854 static void
1856  const struct GNUNET_SETU_Element *element,
1858 {
1859  struct GNUNET_MQ_Envelope *ev;
1860  struct GNUNET_SETU_ResultMessage *rm;
1861 
1863  "sending element (size %u) to client\n",
1864  element->size);
1865  GNUNET_assert (0 != op->client_request_id);
1866  ev = GNUNET_MQ_msg_extra (rm,
1867  element->size,
1869  if (NULL == ev)
1870  {
1871  GNUNET_MQ_discard (ev);
1872  GNUNET_break (0);
1873  return;
1874  }
1875  rm->result_status = htons (status);
1876  rm->request_id = htonl (op->client_request_id);
1877  rm->element_type = htons (element->element_type);
1879  op->key_to_element));
1880  GNUNET_memcpy (&rm[1],
1881  element->data,
1882  element->size);
1883  GNUNET_MQ_send (op->set->cs->mq,
1884  ev);
1885 }
1886 
1887 
1893 static void
1895 {
1896  unsigned int num_demanded;
1897 
1898  num_demanded = GNUNET_CONTAINER_multihashmap_size (
1899  op->demanded_hashes);
1900 
1901  if (PHASE_FINISH_WAITING == op->phase)
1902  {
1904  "In PHASE_FINISH_WAITING, pending %u demands\n",
1905  num_demanded);
1906  if (0 == num_demanded)
1907  {
1908  struct GNUNET_MQ_Envelope *ev;
1909 
1910  op->phase = PHASE_FINISHED;
1912  GNUNET_MQ_send (op->mq,
1913  ev);
1914  /* We now wait until the other peer sends P2P_OVER
1915  * after it got all elements from us. */
1916  }
1917  }
1918  if (PHASE_FINISH_CLOSING == op->phase)
1919  {
1921  "In PHASE_FINISH_CLOSING, pending %u demands\n",
1922  num_demanded);
1923  if (0 == num_demanded)
1924  {
1925  op->phase = PHASE_FINISHED;
1926  send_client_done (op);
1928  }
1929  }
1930 }
1931 
1932 
1939 static int
1941  const struct GNUNET_SETU_ElementMessage *emsg)
1942 {
1943  struct Operation *op = cls;
1944 
1946  {
1947  GNUNET_break_op (0);
1948  return GNUNET_SYSERR;
1949  }
1950  return GNUNET_OK;
1951 }
1952 
1953 
1962 static void
1964  const struct GNUNET_SETU_ElementMessage *emsg)
1965 {
1966  struct Operation *op = cls;
1967  struct ElementEntry *ee;
1968  struct KeyEntry *ke;
1969  uint16_t element_size;
1970 
1971  element_size = ntohs (emsg->header.size) - sizeof(struct
1973  ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
1974  GNUNET_memcpy (&ee[1],
1975  &emsg[1],
1976  element_size);
1977  ee->element.size = element_size;
1978  ee->element.data = &ee[1];
1979  ee->element.element_type = ntohs (emsg->element_type);
1980  ee->remote = GNUNET_YES;
1982  &ee->element_hash);
1983  if (GNUNET_NO ==
1985  &ee->element_hash,
1986  NULL))
1987  {
1988  /* We got something we didn't demand, since it's not in our map. */
1989  GNUNET_break_op (0);
1990  fail_union_operation (op);
1991  return;
1992  }
1993 
1995  "Got element (size %u, hash %s) from peer\n",
1996  (unsigned int) element_size,
1997  GNUNET_h2s (&ee->element_hash));
1998 
1999  GNUNET_STATISTICS_update (_GSS_statistics,
2000  "# received elements",
2001  1,
2002  GNUNET_NO);
2003  GNUNET_STATISTICS_update (_GSS_statistics,
2004  "# exchanged elements",
2005  1,
2006  GNUNET_NO);
2007 
2008  op->received_total++;
2009 
2010  ke = op_get_element (op,
2011  &ee->element_hash);
2012  if (NULL != ke)
2013  {
2014  /* Got repeated element. Should not happen since
2015  * we track demands. */
2016  GNUNET_STATISTICS_update (_GSS_statistics,
2017  "# repeated elements",
2018  1,
2019  GNUNET_NO);
2020  ke->received = GNUNET_YES;
2021  GNUNET_free (ee);
2022  }
2023  else
2024  {
2026  "Registering new element from remote peer\n");
2027  op->received_fresh++;
2028  op_register_element (op, ee, GNUNET_YES);
2029  /* only send results immediately if the client wants it */
2030  send_client_element (op,
2031  &ee->element,
2033  }
2034 
2035  if ((op->received_total > 8) &&
2036  (op->received_fresh < op->received_total / 3))
2037  {
2038  /* The other peer gave us lots of old elements, there's something wrong. */
2039  GNUNET_break_op (0);
2040  fail_union_operation (op);
2041  return;
2042  }
2044  maybe_finish (op);
2045 }
2046 
2047 
2054 static int
2056  const struct GNUNET_SETU_ElementMessage *emsg)
2057 {
2058  struct Operation *op = cls;
2059 
2060  (void) op;
2061 
2062  // FIXME: check that we expect full elements here?
2063  return GNUNET_OK;
2064 }
2065 
2066 
2073 static void
2075  const struct GNUNET_SETU_ElementMessage *emsg)
2076 {
2077  struct Operation *op = cls;
2078  struct ElementEntry *ee;
2079  struct KeyEntry *ke;
2080  uint16_t element_size;
2081 
2082 
2083  /* Allow only receiving of full element message if in expect IBF or in PHASE_FULL_RECEIVING state */
2084  if ( (PHASE_EXPECT_IBF != op->phase) &&
2085  (PHASE_FULL_RECEIVING != op->phase) )
2086  {
2087  GNUNET_break_op (0);
2088  fail_union_operation (op);
2089  return;
2090  }
2091 
2093 
2094  element_size = ntohs (emsg->header.size)
2095  - sizeof(struct GNUNET_SETU_ElementMessage);
2096  ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
2097  GNUNET_memcpy (&ee[1], &emsg[1], element_size);
2098  ee->element.size = element_size;
2099  ee->element.data = &ee[1];
2100  ee->element.element_type = ntohs (emsg->element_type);
2101  ee->remote = GNUNET_YES;
2103  &ee->element_hash);
2105  "Got element (full diff, size %u, hash %s) from peer\n",
2106  (unsigned int) element_size,
2107  GNUNET_h2s (&ee->element_hash));
2108 
2109  GNUNET_STATISTICS_update (_GSS_statistics,
2110  "# received elements",
2111  1,
2112  GNUNET_NO);
2113  GNUNET_STATISTICS_update (_GSS_statistics,
2114  "# exchanged elements",
2115  1,
2116  GNUNET_NO);
2117 
2118  op->received_total++;
2119 
2120  ke = op_get_element (op,
2121  &ee->element_hash);
2122  if (NULL != ke)
2123  {
2124  /* Got repeated element. Should not happen since
2125  * we track demands. */
2126  GNUNET_STATISTICS_update (_GSS_statistics,
2127  "# repeated elements",
2128  1,
2129  GNUNET_NO);
2130  ke->received = GNUNET_YES;
2131  GNUNET_free (ee);
2132  }
2133  else
2134  {
2136  "Registering new element from remote peer\n");
2137  op->received_fresh++;
2138  op_register_element (op, ee, GNUNET_YES);
2139  /* only send results immediately if the client wants it */
2140  send_client_element (op,
2141  &ee->element,
2143  }
2144 
2145  if ((GNUNET_YES == op->byzantine) &&
2146  (op->received_total > 384 + op->received_fresh * 4) &&
2147  (op->received_fresh < op->received_total / 6))
2148  {
2149  /* The other peer gave us lots of old elements, there's something wrong. */
2151  "Other peer sent only %llu/%llu fresh elements, failing operation\n",
2152  (unsigned long long) op->received_fresh,
2153  (unsigned long long) op->received_total);
2154  GNUNET_break_op (0);
2155  fail_union_operation (op);
2156  return;
2157  }
2159 }
2160 
2161 
2169 static int
2171  const struct InquiryMessage *msg)
2172 {
2173  struct Operation *op = cls;
2174  unsigned int num_keys;
2175 
2176  if (op->phase != PHASE_PASSIVE_DECODING)
2177  {
2178  GNUNET_break_op (0);
2179  return GNUNET_SYSERR;
2180  }
2181  num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2182  / sizeof(struct IBF_Key);
2183  if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2184  != num_keys * sizeof(struct IBF_Key))
2185  {
2186  GNUNET_break_op (0);
2187  return GNUNET_SYSERR;
2188  }
2189  return GNUNET_OK;
2190 }
2191 
2192 
2199 static void
2201  const struct InquiryMessage *msg)
2202 {
2203  struct Operation *op = cls;
2204  const struct IBF_Key *ibf_key;
2205  unsigned int num_keys;
2206 
2208  "Received union inquiry\n");
2209  num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2210  / sizeof(struct IBF_Key);
2211  ibf_key = (const struct IBF_Key *) &msg[1];
2212  while (0 != num_keys--)
2213  {
2214  struct IBF_Key unsalted_key;
2215 
2216  unsalt_key (ibf_key,
2217  ntohl (msg->salt),
2218  &unsalted_key);
2219  send_offers_for_key (op,
2220  unsalted_key);
2221  ibf_key++;
2222  }
2224 }
2225 
2226 
2237 static int
2239  uint32_t key,
2240  void *value)
2241 {
2242  struct Operation *op = cls;
2243  struct KeyEntry *ke = value;
2244  struct GNUNET_MQ_Envelope *ev;
2245  struct GNUNET_SETU_ElementMessage *emsg;
2246  struct ElementEntry *ee = ke->element;
2247 
2248  if (GNUNET_YES == ke->received)
2249  return GNUNET_YES;
2250  ev = GNUNET_MQ_msg_extra (emsg,
2251  ee->element.size,
2253  GNUNET_memcpy (&emsg[1],
2254  ee->element.data,
2255  ee->element.size);
2256  emsg->element_type = htons (ee->element.element_type);
2257  GNUNET_MQ_send (op->mq,
2258  ev);
2259  return GNUNET_YES;
2260 }
2261 
2262 
2269 static void
2271  const struct GNUNET_MessageHeader *mh)
2272 {
2273  struct Operation *op = cls;
2274 
2276  "Received request for full set transmission\n");
2277  if (PHASE_EXPECT_IBF != op->phase)
2278  {
2279  GNUNET_break_op (0);
2280  fail_union_operation (op);
2281  return;
2282  }
2283 
2284  // FIXME: we need to check that our set is larger than the
2285  // byzantine_lower_bound by some threshold
2286  send_full_set (op);
2288 }
2289 
2290 
2297 static void
2299  const struct GNUNET_MessageHeader *mh)
2300 {
2301  struct Operation *op = cls;
2302 
2303  switch (op->phase)
2304  {
2305  case PHASE_EXPECT_IBF:
2306  {
2307  struct GNUNET_MQ_Envelope *ev;
2308 
2310  "got FULL DONE, sending elements that other peer is missing\n");
2311 
2312  /* send all the elements that did not come from the remote peer */
2315  op);
2317  GNUNET_MQ_send (op->mq,
2318  ev);
2319  op->phase = PHASE_FINISHED;
2320  /* we now wait until the other peer sends us the OVER message*/
2321  }
2322  break;
2323 
2324  case PHASE_FULL_SENDING:
2325  {
2327  "got FULL DONE, finishing\n");
2328  /* We sent the full set, and got the response for that. We're done. */
2329  op->phase = PHASE_FINISHED;
2331  send_client_done (op);
2333  return;
2334  }
2335  break;
2336 
2337  default:
2339  "Handle full done phase is %u\n",
2340  (unsigned) op->phase);
2341  GNUNET_break_op (0);
2342  fail_union_operation (op);
2343  return;
2344  }
2346 }
2347 
2348 
2357 static int
2359  const struct GNUNET_MessageHeader *mh)
2360 {
2361  struct Operation *op = cls;
2362  unsigned int num_hashes;
2363 
2364  (void) op;
2365  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2366  / sizeof(struct GNUNET_HashCode);
2367  if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2368  != num_hashes * sizeof(struct GNUNET_HashCode))
2369  {
2370  GNUNET_break_op (0);
2371  return GNUNET_SYSERR;
2372  }
2373  return GNUNET_OK;
2374 }
2375 
2376 
2384 static void
2386  const struct GNUNET_MessageHeader *mh)
2387 {
2388  struct Operation *op = cls;
2389  struct ElementEntry *ee;
2390  struct GNUNET_SETU_ElementMessage *emsg;
2391  const struct GNUNET_HashCode *hash;
2392  unsigned int num_hashes;
2393  struct GNUNET_MQ_Envelope *ev;
2394 
2395  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2396  / sizeof(struct GNUNET_HashCode);
2397  for (hash = (const struct GNUNET_HashCode *) &mh[1];
2398  num_hashes > 0;
2399  hash++, num_hashes--)
2400  {
2402  hash);
2403  if (NULL == ee)
2404  {
2405  /* Demand for non-existing element. */
2406  GNUNET_break_op (0);
2407  fail_union_operation (op);
2408  return;
2409  }
2410  if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
2411  {
2412  /* Probably confused lazily copied sets. */
2413  GNUNET_break_op (0);
2414  fail_union_operation (op);
2415  return;
2416  }
2417  ev = GNUNET_MQ_msg_extra (emsg,
2418  ee->element.size,
2420  GNUNET_memcpy (&emsg[1],
2421  ee->element.data,
2422  ee->element.size);
2423  emsg->reserved = htons (0);
2424  emsg->element_type = htons (ee->element.element_type);
2426  "[OP %p] Sending demanded element (size %u, hash %s) to peer\n",
2427  op,
2428  (unsigned int) ee->element.size,
2429  GNUNET_h2s (&ee->element_hash));
2430  GNUNET_MQ_send (op->mq, ev);
2431  GNUNET_STATISTICS_update (_GSS_statistics,
2432  "# exchanged elements",
2433  1,
2434  GNUNET_NO);
2435  if (op->symmetric)
2436  send_client_element (op,
2437  &ee->element,
2439  }
2441 }
2442 
2443 
2451 static int
2453  const struct GNUNET_MessageHeader *mh)
2454 {
2455  struct Operation *op = cls;
2456  unsigned int num_hashes;
2457 
2458  /* look up elements and send them */
2459  if ((op->phase != PHASE_PASSIVE_DECODING) &&
2460  (op->phase != PHASE_ACTIVE_DECODING))
2461  {
2462  GNUNET_break_op (0);
2463  return GNUNET_SYSERR;
2464  }
2465  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2466  / sizeof(struct GNUNET_HashCode);
2467  if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
2468  num_hashes * sizeof(struct GNUNET_HashCode))
2469  {
2470  GNUNET_break_op (0);
2471  return GNUNET_SYSERR;
2472  }
2473  return GNUNET_OK;
2474 }
2475 
2476 
2484 static void
2486  const struct GNUNET_MessageHeader *mh)
2487 {
2488  struct Operation *op = cls;
2489  const struct GNUNET_HashCode *hash;
2490  unsigned int num_hashes;
2491 
2492  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2493  / sizeof(struct GNUNET_HashCode);
2494  for (hash = (const struct GNUNET_HashCode *) &mh[1];
2495  num_hashes > 0;
2496  hash++, num_hashes--)
2497  {
2498  struct ElementEntry *ee;
2499  struct GNUNET_MessageHeader *demands;
2500  struct GNUNET_MQ_Envelope *ev;
2501 
2503  hash);
2504  if (NULL != ee)
2505  if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2506  continue;
2507 
2508  if (GNUNET_YES ==
2510  hash))
2511  {
2513  "Skipped sending duplicate demand\n");
2514  continue;
2515  }
2516 
2519  op->demanded_hashes,
2520  hash,
2521  NULL,
2523 
2525  "[OP %p] Requesting element (hash %s)\n",
2526  op, GNUNET_h2s (hash));
2527  ev = GNUNET_MQ_msg_header_extra (demands,
2528  sizeof(struct GNUNET_HashCode),
2530  GNUNET_memcpy (&demands[1],
2531  hash,
2532  sizeof(struct GNUNET_HashCode));
2533  GNUNET_MQ_send (op->mq, ev);
2534  }
2536 }
2537 
2538 
2545 static void
2547  const struct GNUNET_MessageHeader *mh)
2548 {
2549  struct Operation *op = cls;
2550 
2551  switch (op->phase)
2552  {
2554  /* We got all requests, but still have to send our elements in response. */
2557  "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2558  /* The active peer is done sending offers
2559  * and inquiries. This means that all
2560  * our responses to that (demands and offers)
2561  * must be in flight (queued or in mesh).
2562  *
2563  * We should notify the active peer once
2564  * all our demands are satisfied, so that the active
2565  * peer can quit if we gave it everything.
2567  maybe_finish (op);
2568  return;
2569  case PHASE_ACTIVE_DECODING:
2571  "got DONE (as active partner), waiting to finish\n");
2572  /* All demands of the other peer are satisfied,
2573  * and we processed all offers, thus we know
2574  * exactly what our demands must be.
2575  *
2576  * We'll close the channel
2577  * to the other peer once our demands are met.
2578  */op->phase = PHASE_FINISH_CLOSING;
2580  maybe_finish (op);
2581  return;
2582  default:
2583  GNUNET_break_op (0);
2584  fail_union_operation (op);
2585  return;
2586  }
2587 }
2588 
2589 
2596 static void
2598  const struct GNUNET_MessageHeader *mh)
2599 {
2600  send_client_done (cls);
2601 }
2602 
2603 
2612 static struct Operation *
2613 get_incoming (uint32_t id)
2614 {
2615  for (struct Listener *listener = listener_head;
2616  NULL != listener;
2617  listener = listener->next)
2618  {
2619  for (struct Operation *op = listener->op_head;
2620  NULL != op;
2621  op = op->next)
2622  if (op->suggest_id == id)
2623  return op;
2624  }
2625  return NULL;
2626 }
2627 
2628 
2637 static void *
2639  struct GNUNET_SERVICE_Client *c,
2640  struct GNUNET_MQ_Handle *mq)
2641 {
2642  struct ClientState *cs;
2643 
2644  num_clients++;
2645  cs = GNUNET_new (struct ClientState);
2646  cs->client = c;
2647  cs->mq = mq;
2648  return cs;
2649 }
2650 
2651 
2660 static int
2662  const struct GNUNET_HashCode *key,
2663  void *value)
2664 {
2665  struct ElementEntry *ee = value;
2666 
2667  GNUNET_free (ee);
2668  return GNUNET_YES;
2669 }
2670 
2671 
2679 static void
2681  struct GNUNET_SERVICE_Client *client,
2682  void *internal_cls)
2683 {
2684  struct ClientState *cs = internal_cls;
2685  struct Operation *op;
2686  struct Listener *listener;
2687  struct Set *set;
2688 
2690  "Client disconnected, cleaning up\n");
2691  if (NULL != (set = cs->set))
2692  {
2693  struct SetContent *content = set->content;
2694 
2696  "Destroying client's set\n");
2697  /* Destroy pending set operations */
2698  while (NULL != set->ops_head)
2699  _GSS_operation_destroy (set->ops_head);
2700 
2701  /* Destroy operation-specific state */
2702  if (NULL != set->se)
2703  {
2704  strata_estimator_destroy (set->se);
2705  set->se = NULL;
2706  }
2707  /* free set content (or at least decrement RC) */
2708  set->content = NULL;
2709  GNUNET_assert (0 != content->refcount);
2710  content->refcount--;
2711  if (0 == content->refcount)
2712  {
2713  GNUNET_assert (NULL != content->elements);
2716  NULL);
2718  content->elements = NULL;
2719  GNUNET_free (content);
2720  }
2721  GNUNET_free (set);
2722  }
2723 
2724  if (NULL != (listener = cs->listener))
2725  {
2727  "Destroying client's listener\n");
2728  GNUNET_CADET_close_port (listener->open_port);
2729  listener->open_port = NULL;
2730  while (NULL != (op = listener->op_head))
2731  {
2733  "Destroying incoming operation `%u' from peer `%s'\n",
2734  (unsigned int) op->client_request_id,
2735  GNUNET_i2s (&op->peer));
2736  incoming_destroy (op);
2737  }
2738  GNUNET_CONTAINER_DLL_remove (listener_head,
2739  listener_tail,
2740  listener);
2741  GNUNET_free (listener);
2742  }
2743  GNUNET_free (cs);
2744  num_clients--;
2745  if ( (GNUNET_YES == in_shutdown) &&
2746  (0 == num_clients) )
2747  {
2748  if (NULL != cadet)
2749  {
2750  GNUNET_CADET_disconnect (cadet);
2751  cadet = NULL;
2752  }
2753  }
2754 }
2755 
2756 
2765 static int
2767  const struct OperationRequestMessage *msg)
2768 {
2769  struct Operation *op = cls;
2770  struct Listener *listener = op->listener;
2771  const struct GNUNET_MessageHeader *nested_context;
2772 
2773  /* double operation request */
2774  if (0 != op->suggest_id)
2775  {
2776  GNUNET_break_op (0);
2777  return GNUNET_SYSERR;
2778  }
2779  /* This should be equivalent to the previous condition, but can't hurt to check twice */
2780  if (NULL == listener)
2781  {
2782  GNUNET_break (0);
2783  return GNUNET_SYSERR;
2784  }
2785  nested_context = GNUNET_MQ_extract_nested_mh (msg);
2786  if ((NULL != nested_context) &&
2787  (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
2788  {
2789  GNUNET_break_op (0);
2790  return GNUNET_SYSERR;
2791  }
2792  return GNUNET_OK;
2793 }
2794 
2795 
2811 static void
2813  const struct OperationRequestMessage *msg)
2814 {
2815  struct Operation *op = cls;
2816  struct Listener *listener = op->listener;
2817  const struct GNUNET_MessageHeader *nested_context;
2818  struct GNUNET_MQ_Envelope *env;
2819  struct GNUNET_SETU_RequestMessage *cmsg;
2820 
2821  nested_context = GNUNET_MQ_extract_nested_mh (msg);
2822  /* Make a copy of the nested_context (application-specific context
2823  information that is opaque to set) so we can pass it to the
2824  listener later on */
2825  if (NULL != nested_context)
2826  op->context_msg = GNUNET_copy_message (nested_context);
2827  op->remote_element_count = ntohl (msg->element_count);
2828  GNUNET_log (
2830  "Received P2P operation request (port %s) for active listener\n",
2831  GNUNET_h2s (&op->listener->app_id));
2832  GNUNET_assert (0 == op->suggest_id);
2833  if (0 == suggest_id)
2834  suggest_id++;
2835  op->suggest_id = suggest_id++;
2836  GNUNET_assert (NULL != op->timeout_task);
2838  op->timeout_task = NULL;
2839  env = GNUNET_MQ_msg_nested_mh (cmsg,
2841  op->context_msg);
2842  GNUNET_log (
2844  "Suggesting incoming request with accept id %u to listener %p of client %p\n",
2845  op->suggest_id,
2846  listener,
2847  listener->cs);
2848  cmsg->accept_id = htonl (op->suggest_id);
2849  cmsg->peer_id = op->peer;
2850  GNUNET_MQ_send (listener->cs->mq,
2851  env);
2852  /* NOTE: GNUNET_CADET_receive_done() will be called in
2853  #handle_client_accept() */
2854 }
2855 
2856 
2865 static void
2867  const struct GNUNET_SETU_CreateMessage *msg)
2868 {
2869  struct ClientState *cs = cls;
2870  struct Set *set;
2871 
2873  "Client created new set for union operation\n");
2874  if (NULL != cs->set)
2875  {
2876  /* There can only be one set per client */
2877  GNUNET_break (0);
2879  return;
2880  }
2881  set = GNUNET_new (struct Set);
2882  {
2883  struct StrataEstimator *se;
2884 
2886  SE_IBF_SIZE,
2887  SE_IBF_HASH_NUM);
2888  if (NULL == se)
2889  {
2891  "Failed to allocate strata estimator\n");
2892  GNUNET_free (set);
2894  return;
2895  }
2896  set->se = se;
2897  }
2898  set->content = GNUNET_new (struct SetContent);
2899  set->content->refcount = 1;
2900  set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
2901  GNUNET_YES);
2902  set->cs = cs;
2903  cs->set = set;
2905 }
2906 
2907 
2917 static void
2919 {
2920  struct Operation *op = cls;
2921 
2922  op->timeout_task = NULL;
2924  "Remote peer's incoming request timed out\n");
2925  incoming_destroy (op);
2926 }
2927 
2928 
2945 static void *
2946 channel_new_cb (void *cls,
2947  struct GNUNET_CADET_Channel *channel,
2948  const struct GNUNET_PeerIdentity *source)
2949 {
2950  struct Listener *listener = cls;
2951  struct Operation *op;
2952 
2954  "New incoming channel\n");
2955  op = GNUNET_new (struct Operation);
2956  op->listener = listener;
2957  op->peer = *source;
2958  op->channel = channel;
2959  op->mq = GNUNET_CADET_get_mq (op->channel);
2961  UINT32_MAX);
2964  op);
2966  listener->op_tail,
2967  op);
2968  return op;
2969 }
2970 
2971 
2988 static void
2989 channel_end_cb (void *channel_ctx,
2990  const struct GNUNET_CADET_Channel *channel)
2991 {
2992  struct Operation *op = channel_ctx;
2993 
2994  op->channel = NULL;
2996 }
2997 
2998 
3013 static void
3015  const struct GNUNET_CADET_Channel *channel,
3016  int window_size)
3017 {
3018  /* FIXME: not implemented, we could do flow control here... */
3019 }
3020 
3021 
3028 static void
3030  const struct GNUNET_SETU_ListenMessage *msg)
3031 {
3032  struct ClientState *cs = cls;
3033  struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
3034  GNUNET_MQ_hd_var_size (incoming_msg,
3036  struct OperationRequestMessage,
3037  NULL),
3038  GNUNET_MQ_hd_var_size (union_p2p_ibf,
3040  struct IBFMessage,
3041  NULL),
3042  GNUNET_MQ_hd_var_size (union_p2p_elements,
3045  NULL),
3046  GNUNET_MQ_hd_var_size (union_p2p_offer,
3048  struct GNUNET_MessageHeader,
3049  NULL),
3050  GNUNET_MQ_hd_var_size (union_p2p_inquiry,
3052  struct InquiryMessage,
3053  NULL),
3054  GNUNET_MQ_hd_var_size (union_p2p_demand,
3056  struct GNUNET_MessageHeader,
3057  NULL),
3058  GNUNET_MQ_hd_fixed_size (union_p2p_done,
3060  struct GNUNET_MessageHeader,
3061  NULL),
3062  GNUNET_MQ_hd_fixed_size (union_p2p_over,
3064  struct GNUNET_MessageHeader,
3065  NULL),
3066  GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
3068  struct GNUNET_MessageHeader,
3069  NULL),
3070  GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
3072  struct GNUNET_MessageHeader,
3073  NULL),
3074  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3076  struct StrataEstimatorMessage,
3077  NULL),
3078  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3080  struct StrataEstimatorMessage,
3081  NULL),
3082  GNUNET_MQ_hd_var_size (union_p2p_full_element,
3085  NULL),
3087  };
3088  struct Listener *listener;
3089 
3090  if (NULL != cs->listener)
3091  {
3092  /* max. one active listener per client! */
3093  GNUNET_break (0);
3095  return;
3096  }
3097  listener = GNUNET_new (struct Listener);
3098  listener->cs = cs;
3099  cs->listener = listener;
3100  listener->app_id = msg->app_id;
3101  GNUNET_CONTAINER_DLL_insert (listener_head,
3102  listener_tail,
3103  listener);
3105  "New listener created (port %s)\n",
3106  GNUNET_h2s (&listener->app_id));
3107  listener->open_port = GNUNET_CADET_open_port (cadet,
3108  &msg->app_id,
3109  &channel_new_cb,
3110  listener,
3112  &channel_end_cb,
3113  cadet_handlers);
3115 }
3116 
3117 
3125 static void
3127  const struct GNUNET_SETU_RejectMessage *msg)
3128 {
3129  struct ClientState *cs = cls;
3130  struct Operation *op;
3131 
3132  op = get_incoming (ntohl (msg->accept_reject_id));
3133  if (NULL == op)
3134  {
3135  /* no matching incoming operation for this reject;
3136  could be that the other peer already disconnected... */
3138  "Client rejected unknown operation %u\n",
3139  (unsigned int) ntohl (msg->accept_reject_id));
3141  return;
3142  }
3144  "Peer request (app %s) rejected by client\n",
3145  GNUNET_h2s (&cs->listener->app_id));
3148 }
3149 
3150 
3157 static int
3159  const struct GNUNET_SETU_ElementMessage *msg)
3160 {
3161  /* NOTE: Technically, we should probably check with the
3162  block library whether the element we are given is well-formed */
3163  return GNUNET_OK;
3164 }
3165 
3166 
3173 static void
3175  const struct GNUNET_SETU_ElementMessage *msg)
3176 {
3177  struct ClientState *cs = cls;
3178  struct Set *set;
3179  struct GNUNET_SETU_Element el;
3180  struct ElementEntry *ee;
3181  struct GNUNET_HashCode hash;
3182 
3183  if (NULL == (set = cs->set))
3184  {
3185  /* client without a set requested an operation */
3186  GNUNET_break (0);
3188  return;
3189  }
3191  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
3192  el.size = ntohs (msg->header.size) - sizeof(*msg);
3193  el.data = &msg[1];
3194  el.element_type = ntohs (msg->element_type);
3196  &hash);
3197  ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
3198  &hash);
3199  if (NULL == ee)
3200  {
3202  "Client inserts element %s of size %u\n",
3203  GNUNET_h2s (&hash),
3204  el.size);
3205  ee = GNUNET_malloc (el.size + sizeof(*ee));
3206  ee->element.size = el.size;
3207  GNUNET_memcpy (&ee[1], el.data, el.size);
3208  ee->element.data = &ee[1];
3210  ee->remote = GNUNET_NO;
3211  ee->generation = set->current_generation;
3212  ee->element_hash = hash;
3215  set->content->elements,
3216  &ee->element_hash,
3217  ee,
3219  }
3220  else
3221  {
3223  "Client inserted element %s of size %u twice (ignored)\n",
3224  GNUNET_h2s (&hash),
3225  el.size);
3226  /* same element inserted twice */
3227  return;
3228  }
3229  strata_estimator_insert (set->se,
3230  get_ibf_key (&ee->element_hash));
3231 }
3232 
3233 
3240 static void
3242 {
3243  set->content->latest_generation++;
3244  set->current_generation++;
3245 }
3246 
3247 
3257 static int
3259  const struct GNUNET_SETU_EvaluateMessage *msg)
3260 {
3261  /* FIXME: suboptimal, even if the context below could be NULL,
3262  there are malformed messages this does not check for... */
3263  return GNUNET_OK;
3264 }
3265 
3266 
3275 static void
3277  const struct GNUNET_SETU_EvaluateMessage *msg)
3278 {
3279  struct ClientState *cs = cls;
3280  struct Operation *op = GNUNET_new (struct Operation);
3281  const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
3282  GNUNET_MQ_hd_var_size (incoming_msg,
3284  struct OperationRequestMessage,
3285  op),
3286  GNUNET_MQ_hd_var_size (union_p2p_ibf,
3288  struct IBFMessage,
3289  op),
3290  GNUNET_MQ_hd_var_size (union_p2p_elements,
3293  op),
3294  GNUNET_MQ_hd_var_size (union_p2p_offer,
3296  struct GNUNET_MessageHeader,
3297  op),
3298  GNUNET_MQ_hd_var_size (union_p2p_inquiry,
3300  struct InquiryMessage,
3301  op),
3302  GNUNET_MQ_hd_var_size (union_p2p_demand,
3304  struct GNUNET_MessageHeader,
3305  op),
3306  GNUNET_MQ_hd_fixed_size (union_p2p_done,
3308  struct GNUNET_MessageHeader,
3309  op),
3310  GNUNET_MQ_hd_fixed_size (union_p2p_over,
3312  struct GNUNET_MessageHeader,
3313  op),
3314  GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
3316  struct GNUNET_MessageHeader,
3317  op),
3318  GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
3320  struct GNUNET_MessageHeader,
3321  op),
3322  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3324  struct StrataEstimatorMessage,
3325  op),
3326  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3328  struct StrataEstimatorMessage,
3329  op),
3330  GNUNET_MQ_hd_var_size (union_p2p_full_element,
3333  op),
3335  };
3336  struct Set *set;
3337  const struct GNUNET_MessageHeader *context;
3338 
3339  if (NULL == (set = cs->set))
3340  {
3341  GNUNET_break (0);
3342  GNUNET_free (op);
3344  return;
3345  }
3347  UINT32_MAX);
3348  op->peer = msg->target_peer;
3349  op->client_request_id = ntohl (msg->request_id);
3350  op->byzantine = msg->byzantine;
3351  op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
3352  op->force_full = msg->force_full;
3353  op->force_delta = msg->force_delta;
3354  op->symmetric = msg->symmetric;
3355  context = GNUNET_MQ_extract_nested_mh (msg);
3356 
3357  /* Advance generation values, so that
3358  mutations won't interfer with the running operation. */
3359  op->set = set;
3360  op->generation_created = set->current_generation;
3361  advance_generation (set);
3362  GNUNET_CONTAINER_DLL_insert (set->ops_head,
3363  set->ops_tail,
3364  op);
3366  "Creating new CADET channel to port %s for set union\n",
3367  GNUNET_h2s (&msg->app_id));
3368  op->channel = GNUNET_CADET_channel_create (cadet,
3369  op,
3370  &msg->target_peer,
3371  &msg->app_id,
3373  &channel_end_cb,
3374  cadet_handlers);
3375  op->mq = GNUNET_CADET_get_mq (op->channel);
3376  {
3377  struct GNUNET_MQ_Envelope *ev;
3378  struct OperationRequestMessage *msg;
3379 
3380  ev = GNUNET_MQ_msg_nested_mh (msg,
3382  context);
3383  if (NULL == ev)
3384  {
3385  /* the context message is too large */
3386  GNUNET_break (0);
3388  return;
3389  }
3391  GNUNET_NO);
3392  /* copy the current generation's strata estimator for this operation */
3393  op->se = strata_estimator_dup (op->set->se);
3394  /* we started the operation, thus we have to send the operation request */
3395  op->phase = PHASE_EXPECT_SE;
3396  op->salt_receive = op->salt_send = 42; // FIXME?????
3398  "Initiating union operation evaluation\n");
3399  GNUNET_STATISTICS_update (_GSS_statistics,
3400  "# of total union operations",
3401  1,
3402  GNUNET_NO);
3403  GNUNET_STATISTICS_update (_GSS_statistics,
3404  "# of initiated union operations",
3405  1,
3406  GNUNET_NO);
3407  GNUNET_MQ_send (op->mq,
3408  ev);
3409  if (NULL != context)
3411  "sent op request with context message\n");
3412  else
3414  "sent op request without context message\n");
3417  op->key_to_element);
3418 
3419  }
3421 }
3422 
3423 
3430 static void
3432  const struct GNUNET_SETU_CancelMessage *msg)
3433 {
3434  struct ClientState *cs = cls;
3435  struct Set *set;
3436  struct Operation *op;
3437  int found;
3438 
3439  if (NULL == (set = cs->set))
3440  {
3441  /* client without a set requested an operation */
3442  GNUNET_break (0);
3444  return;
3445  }
3446  found = GNUNET_NO;
3447  for (op = set->ops_head; NULL != op; op = op->next)
3448  {
3449  if (op->client_request_id == ntohl (msg->request_id))
3450  {
3451  found = GNUNET_YES;
3452  break;
3453  }
3454  }
3455  if (GNUNET_NO == found)
3456  {
3457  /* It may happen that the operation was already destroyed due to
3458  * the other peer disconnecting. The client may not know about this
3459  * yet and try to cancel the (just barely non-existent) operation.
3460  * So this is not a hard error.
3461  *///
3463  "Client canceled non-existent op %u\n",
3464  (uint32_t) ntohl (msg->request_id));
3465  }
3466  else
3467  {
3469  "Client requested cancel for op %u\n",
3470  (uint32_t) ntohl (msg->request_id));
3472  }
3474 }
3475 
3476 
3485 static void
3487  const struct GNUNET_SETU_AcceptMessage *msg)
3488 {
3489  struct ClientState *cs = cls;
3490  struct Set *set;
3491  struct Operation *op;
3492  struct GNUNET_SETU_ResultMessage *result_message;
3493  struct GNUNET_MQ_Envelope *ev;
3494  struct Listener *listener;
3495 
3496  if (NULL == (set = cs->set))
3497  {
3498  /* client without a set requested to accept */
3499  GNUNET_break (0);
3501  return;
3502  }
3503  op = get_incoming (ntohl (msg->accept_reject_id));
3504  if (NULL == op)
3505  {
3506  /* It is not an error if the set op does not exist -- it may
3507  * have been destroyed when the partner peer disconnected. */
3508  GNUNET_log (
3510  "Client %p accepted request %u of listener %p that is no longer active\n",
3511  cs,
3512  ntohl (msg->accept_reject_id),
3513  cs->listener);
3514  ev = GNUNET_MQ_msg (result_message,
3516  result_message->request_id = msg->request_id;
3517  result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
3518  GNUNET_MQ_send (set->cs->mq, ev);
3520  return;
3521  }
3523  "Client accepting request %u\n",
3524  (uint32_t) ntohl (msg->accept_reject_id));
3525  listener = op->listener;
3526  op->listener = NULL;
3528  listener->op_tail,
3529  op);
3530  op->set = set;
3531  GNUNET_CONTAINER_DLL_insert (set->ops_head,
3532  set->ops_tail,
3533  op);
3534  op->client_request_id = ntohl (msg->request_id);
3535  op->byzantine = msg->byzantine;
3536  op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
3537  op->force_full = msg->force_full;
3538  op->force_delta = msg->force_delta;
3539  op->symmetric = msg->symmetric;
3540 
3541  /* Advance generation values, so that future mutations do not
3542  interfer with the running operation. */
3543  op->generation_created = set->current_generation;
3544  advance_generation (set);
3545  GNUNET_assert (NULL == op->se);
3546 
3548  "accepting set union operation\n");
3549  GNUNET_STATISTICS_update (_GSS_statistics,
3550  "# of accepted union operations",
3551  1,
3552  GNUNET_NO);
3553  GNUNET_STATISTICS_update (_GSS_statistics,
3554  "# of total union operations",
3555  1,
3556  GNUNET_NO);
3557  {
3558  const struct StrataEstimator *se;
3559  struct GNUNET_MQ_Envelope *ev;
3560  struct StrataEstimatorMessage *strata_msg;
3561  char *buf;
3562  size_t len;
3563  uint16_t type;
3564 
3565  op->se = strata_estimator_dup (op->set->se);
3567  GNUNET_NO);
3568  op->salt_receive = op->salt_send = 42; // FIXME?????
3571  op->key_to_element);
3572 
3573  /* kick off the operation */
3574  se = op->se;
3575  buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
3576  len = strata_estimator_write (se,
3577  buf);
3578  if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
3580  else
3582  ev = GNUNET_MQ_msg_extra (strata_msg,
3583  len,
3584  type);
3585  GNUNET_memcpy (&strata_msg[1],
3586  buf,
3587  len);
3588  GNUNET_free (buf);
3589  strata_msg->set_size
3591  op->set->content->elements));
3592  GNUNET_MQ_send (op->mq,
3593  ev);
3594  op->phase = PHASE_EXPECT_IBF;
3595  }
3596  /* Now allow CADET to continue, as we did not do this in
3597  #handle_incoming_msg (as we wanted to first see if the
3598  local client would accept the request). */
3601 }
3602 
3603 
3609 static void
3610 shutdown_task (void *cls)
3611 {
3612  /* Delay actual shutdown to allow service to disconnect clients */
3614  if (0 == num_clients)
3615  {
3616  if (NULL != cadet)
3617  {
3618  GNUNET_CADET_disconnect (cadet);
3619  cadet = NULL;
3620  }
3621  }
3622  GNUNET_STATISTICS_destroy (_GSS_statistics,
3623  GNUNET_YES);
3625  "handled shutdown request\n");
3626 }
3627 
3628 
3637 static void
3638 run (void *cls,
3639  const struct GNUNET_CONFIGURATION_Handle *cfg,
3641 {
3642  /* FIXME: need to modify SERVICE (!) API to allow
3643  us to run a shutdown task *after* clients were
3644  forcefully disconnected! */
3646  NULL);
3647  _GSS_statistics = GNUNET_STATISTICS_create ("setu",
3648  cfg);
3649  cadet = GNUNET_CADET_connect (cfg);
3650  if (NULL == cadet)
3651  {
3653  _ ("Could not connect to CADET service\n"));
3655  return;
3656  }
3657 }
3658 
3659 
3664  "set",
3666  &run,
3669  NULL,
3670  GNUNET_MQ_hd_fixed_size (client_accept,
3673  NULL),
3674  GNUNET_MQ_hd_var_size (client_set_add,
3677  NULL),
3678  GNUNET_MQ_hd_fixed_size (client_create_set,
3681  NULL),
3682  GNUNET_MQ_hd_var_size (client_evaluate,
3685  NULL),
3686  GNUNET_MQ_hd_fixed_size (client_listen,
3689  NULL),
3690  GNUNET_MQ_hd_fixed_size (client_reject,
3693  NULL),
3694  GNUNET_MQ_hd_fixed_size (client_cancel,
3697  NULL),
3699 
3700 
3701 /* end of gnunet-service-setu.c */
struct Operation * prev
Kept in a DLL of the listener, if listener is non-NULL.
Context for op_get_element_iterator.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
uint32_t offset
Offset of the strata in the rest of the message.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST
Request a set union operation from a remote peer.
struct Set * prev
Sets are held in a doubly linked list.
uint32_t received_fresh
Number of elements we received from the other peer that were not in the local set yet...
static void client_disconnect_cb(void *cls, struct GNUNET_SERVICE_Client *client, void *internal_cls)
Clean up after a client has disconnected.
Message sent by the client to the service to start listening for incoming requests to perform a certa...
Definition: setu.h:55
void GNUNET_CADET_disconnect(struct GNUNET_CADET_Handle *handle)
Disconnect from the cadet service.
Definition: cadet_api.c:775
static int check_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check a full element message from a remote peer.
struct Set * set
Set, if associated with the client, otherwise NULL.
#define GNUNET_MESSAGE_TYPE_SETU_REQUEST
Notify the client of an incoming request from a remote peer.
static struct GNUNET_SERVICE_Handle * service
Handle to our service instance.
struct StrataEstimator * se
Copy of the set&#39;s strata estimator at the time of creation of this operation.
After sending the full set, wait for responses with the elements that the local peer is missing...
static int check_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Check a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
State we keep per client.
static unsigned int phase
Processing stage that we are in.
Definition: gnunet-arm.c:114
static int prepare_ibf_iterator(void *cls, uint32_t key, void *value)
Insert a key into an ibf.
static void maybe_finish(struct Operation *op)
Tests if the operation is finished, and if so notify.
struct GNUNET_MessageHeader * context_msg
Context message, may be NULL.
static const struct GNUNET_CONFIGURATION_Handle * cfg
Configuration we are using.
Definition: gnunet-abd.c:36
static void send_full_set(struct Operation *op)
Switch to full set transmission for op.
unsigned int GNUNET_CONTAINER_multihashmap_size(const struct GNUNET_CONTAINER_MultiHashMap *map)
Get the number of key-value pairs in the map.
#define SE_IBF_HASH_NUM
The hash num parameter for the difference digests and strata estimators.
enum IntersectionOperationPhase phase
Current state of the operation.
static unsigned int element_size
int received
Did we receive this element? Even if element->is_foreign is false, we might have received the element...
uint8_t reserved1
Padding, must be 0.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
static int byzantine
uint32_t suggest_id
Unique request id for the request from a remote peer, sent to the client, which will accept or reject...
struct Operation * op
Operation for which the elements should be sent.
uint32_t element_count
For Intersection: my element count.
GNUNET_SERVICE_MAIN("set", GNUNET_SERVICE_OPTION_NONE, &run, &client_connect_cb, &client_disconnect_cb, NULL, GNUNET_MQ_hd_fixed_size(client_accept, GNUNET_MESSAGE_TYPE_SETU_ACCEPT, struct GNUNET_SETU_AcceptMessage, NULL), GNUNET_MQ_hd_var_size(client_set_add, GNUNET_MESSAGE_TYPE_SETU_ADD, struct GNUNET_SETU_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size(client_create_set, GNUNET_MESSAGE_TYPE_SETU_CREATE, struct GNUNET_SETU_CreateMessage, NULL), GNUNET_MQ_hd_var_size(client_evaluate, GNUNET_MESSAGE_TYPE_SETU_EVALUATE, struct GNUNET_SETU_EvaluateMessage, NULL), GNUNET_MQ_hd_fixed_size(client_listen, GNUNET_MESSAGE_TYPE_SETU_LISTEN, struct GNUNET_SETU_ListenMessage, NULL), GNUNET_MQ_hd_fixed_size(client_reject, GNUNET_MESSAGE_TYPE_SETU_REJECT, struct GNUNET_SETU_RejectMessage, NULL), GNUNET_MQ_hd_fixed_size(client_cancel, GNUNET_MESSAGE_TYPE_SETU_CANCEL, struct GNUNET_SETU_CancelMessage, NULL), GNUNET_MQ_handler_end())
Define "main" method using service macro.
Opaque handle to the service.
Definition: cadet_api.c:38
Handle to a service.
Definition: service.c:116
#define GNUNET_MQ_msg_nested_mh(mvar, type, mh)
Allocate a GNUNET_MQ_Envelope, and append a payload message after the given message struct...
int byzantine_lower_bound
Lower bound for the set size, used only when byzantine mode is enabled.
struct GNUNET_HashCode app_id
Application id.
Definition: setu.h:197
Element should be added to the result set of the remote peer, i.e.
struct GNUNET_MQ_Handle * mq
MQ to talk to client.
const void * data
Actual data of the element.
uint8_t symmetric
Also return set elements we are sending to the remote peer.
Definition: setu.h:220
uint16_t reserved
For alignment, always zero.
Definition: setu.h:290
uint32_t accept_reject_id
ID of the incoming request we want to reject.
Definition: setu.h:141
static void send_client_element(struct Operation *op, const struct GNUNET_SETU_Element *element, enum GNUNET_SETU_Status status)
Send a result message to the client indicating that there is a new element.
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run on shutdown, that is when a CTRL-C signal is received, or when GNUNET_SCHEDULER_shutdown() is being invoked.
Definition: scheduler.c:1331
struct GNUNET_HashCode element_hash
Hash of the element.
unsigned int refcount
Number of references to the content.
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
int GNUNET_CONTAINER_multihashmap32_iterate(struct GNUNET_CONTAINER_MultiHashMap32 *map, GNUNET_CONTAINER_MulitHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map.
struct StrataEstimator * strata_estimator_create(unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum)
Create a new strata estimator with the given parameters.
Message containing buckets of an invertible bloom filter.
struct Listener * next
Listeners are held in a doubly linked list.
static void handle_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
#define GNUNET_MQ_extract_nested_mh(var)
Return a pointer to the message at the end of the given message.
struct InvertibleBloomFilter * remote_ibf
The IBF we currently receive.
uint32_t GNUNET_CRYPTO_random_u32(enum GNUNET_CRYPTO_Quality mode, uint32_t i)
Produce a random value.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
static struct GNUNET_CADET_Handle * mh
Cadet handle.
Definition: gnunet-cadet.c:92
The other peer is decoding the IBF we just sent.
static struct Listener * listener_tail
Listeners are held in a doubly linked list.
uint32_t request_id
ID of the request we want to cancel.
Definition: setu.h:309
Invertible bloom filter (IBF).
Definition: ibf.h:82
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE(C)
struct Operation * op_head
Head of DLL of operations this listener is responsible for.
In the ultimate phase, we wait until our demands are satisfied and then quit (sending another DONE me...
UnionOperationPhase
Current phase we are in for a union operation.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SE
Strata estimator.
static void handle_union_p2p_request_full(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a request for full set transmission.
static int init_key_to_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for initializing the key-to-element mapping of a union operation.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
static int ret
Return value of the commandline.
Definition: gnunet-abd.c:81
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:67
uint32_t accept_reject_id
ID of the incoming request we want to accept.
Definition: setu.h:88
static int check_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Check an IBF message from a remote peer.
static void send_offers_for_key(struct Operation *op, struct IBF_Key ibf_key)
Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
static void unsalt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
FIXME.
struct GNUNET_CONTAINER_MultiHashMap * elements
Maps struct GNUNET_HashCode * to struct ElementEntry *.
static void handle_client_cancel(void *cls, const struct GNUNET_SETU_CancelMessage *msg)
Handle a request from the client to cancel a running set operation.
struct KeyEntry * k
FIXME.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER
Tell the other peer which hashes match a given IBF key.
uint64_t initial_size
Initial size of our set, just before the operation started.
Opaque handle to a channel.
Definition: cadet.h:116
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
static struct GNUNET_CADET_Handle * cadet
Handle to the cadet service, used to listen for and connect to remote peers.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
uint64_t key_val
Definition: ibf.h:47
int GNUNET_CONTAINER_multihashmap32_get_multiple(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, GNUNET_CONTAINER_MulitHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map that match a particular key.
unsigned int ibf_buckets_received
Number of ibf buckets already received into the remote_ibf.
A listener is inhabited by a client, and waits for evaluation requests from remote peers...
#define GNUNET_MESSAGE_TYPE_SETU_P2P_DONE
Set operation is done.
uint32_t request_id
id the result belongs to
Definition: setu.h:254
void ibf_write_slice(const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf)
Write buckets from an ibf to a buffer.
Definition: ibf.c:290
uint32_t salt_receive
Salt for the IBF we&#39;ve received and that we&#39;re currently decoding.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
int GNUNET_CONTAINER_multihashmap_contains(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Check if the map contains any value under the given key (including values that are NULL)...
static void handle_union_p2p_over(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a over message from a remote peer.
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
uint32_t request_id
Request ID to identify responses.
Definition: setu.h:93
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
uint8_t byzantine
GNUNET_YES to fail operations where Byzantine faults are suspected
Definition: setu.h:111
void GNUNET_SCHEDULER_shutdown(void)
Request the shutdown of a scheduler.
Definition: scheduler.c:531
Handle for the service.
static void _GSS_operation_destroy2(struct Operation *op)
This function probably should not exist and be replaced by inlining more specific logic in the variou...
static unsigned int force_full
static void * channel_new_cb(void *cls, struct GNUNET_CADET_Channel *channel, const struct GNUNET_PeerIdentity *source)
Method called whenever another peer has added us to a channel the other peer initiated.
static int send_ibf(struct Operation *op, uint16_t ibf_order)
Send an ibf of appropriate size.
int GNUNET_CONTAINER_multihashmap32_put(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
uint16_t element_type
Type of the element attachted to the message, if any.
Definition: setu.h:265
Message sent by the client to the service to ask starting a new set to perform operations with...
Definition: setu.h:40
Internal representation of the hash map.
#define SE_IBF_SIZE
Size of the IBFs in the strata estimator.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur...
#define INCOMING_CHANNEL_TIMEOUT
How long do we hold on to an incoming channel if there is no local listener before giving up...
unsigned int generation_created
Generation in which the operation handle was created.
static unsigned int num_clients
Number of active clients.
struct GNUNET_SCHEDULER_Task * timeout_task
Timeout task, if the incoming peer has not been accepted after the timeout, it will be disconnected...
static int send_full_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Send a set element.
static struct GNUNET_DNSSTUB_Context * ctx
Context for DNS resolution.
static pa_context * context
Pulseaudio context.
const void * data
Actual data of the element.
struct GNUNET_CADET_Handle * GNUNET_CADET_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the MQ-based cadet service.
Definition: cadet_api.c:910
The protocol is almost finished, but we still have to flush our message queue and/or expect some elem...
A handle to a strata estimator.
struct StrataEstimator * strata_estimator_dup(struct StrataEstimator *se)
Make a copy of a strata estimator.
static void handle_union_p2p_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a done message from a remote peer.
struct Operation * next
Kept in a DLL of the listener, if listener is non-NULL.
static void _GSS_operation_destroy(struct Operation *op)
Destroy the given operation.
uint8_t force_full
Always send full sets, even if delta operations would be more efficient.
Definition: setu.h:209
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
#define _(String)
GNU gettext support macro.
Definition: platform.h:184
The key entry is used to associate an ibf key with an element.
Handle to a client that is connected to a service.
Definition: service.c:250
static void send_client_done(void *cls)
Signal to the client that the operation has finished and destroy the operation.
struct GNUNET_PeerIdentity peer
The identity of the requesting peer.
static struct IBF_Key get_ibf_key(const struct GNUNET_HashCode *src)
Derive the IBF key from a hash code and a salt.
uint8_t symmetric
GNUNET_YES to also send back set elements we are sending to the remote peer.
Definition: setu.h:117
struct InvertibleBloomFilter * ibf_dup(const struct InvertibleBloomFilter *ibf)
Create a copy of an IBF, the copy has to be destroyed properly.
Definition: ibf.c:379
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEC
Compressed strata estimator.
uint16_t element_type
Type of the element to add or remove.
Definition: setu.h:285
uint32_t size
How many cells does this IBF have?
Definition: ibf.h:87
static void handle_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Handle an IBF message from a remote peer.
static int check_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
Message sent by client to service to initiate a set operation as a client (not as listener)...
Definition: setu.h:176
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct...
Definition: gnunet_mq_lib.h:52
static void fail_union_operation(struct Operation *op)
Inform the client that the union operation has failed, and proceed to destroy the evaluate operation...
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1269
The other peer refused to do the operation with us, or something went wrong.
static struct KeyEntry * op_get_element(struct Operation *op, const struct GNUNET_HashCode *element_hash)
Determine whether the given element is already in the operation&#39;s element set.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
static int check_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
GNUNET_SETU_Status
Status for the result callback.
struct StrataEstimator * se
The strata estimator is only generated once for each set.
struct GNUNET_CADET_Port * GNUNET_CADET_open_port(struct GNUNET_CADET_Handle *h, const struct GNUNET_HashCode *port, GNUNET_CADET_ConnectEventHandler connects, void *connects_cls, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Open a port to receive incomming MQ-based channels.
Definition: cadet_api.c:970
#define GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL
Demand the whole element from the other peer, given only the hash code.
, &#39; bother checking if a value already exists (faster than GNUNET_CONTAINER_MULTIHASHMAPOPTION_...
static int destroy_elements_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator over hash map entries to free element entries.
static char * value
Value of the record to add/remove.
#define MAX_BUCKETS_PER_MESSAGE
Number of buckets that can be transmitted in one message.
struct GNUNET_CONTAINER_MultiHashMap * demanded_hashes
Hashes for elements that we have demanded from the other peer.
Information about an element element in the set.
#define GNUNET_MESSAGE_TYPE_SETU_ACCEPT
Accept an incoming set request.
#define GNUNET_MESSAGE_TYPE_SETU_RESULT
Handle result message from operation.
#define GNUNET_MESSAGE_TYPE_SETU_CREATE
Create a new local set.
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
static void advance_generation(struct Set *set)
Advance the current generation of a set, adding exclusion ranges if necessary.
Success, all elements have been sent (and received).
static void handle_union_p2p_full_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a "full done" message.
void strata_estimator_destroy(struct StrataEstimator *se)
Destroy a strata estimator, free all of its resources.
int remote
GNUNET_YES if the element is a remote element, and does not belong to the operation&#39;s set...
uint16_t element_type
Application-specific element type.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
int ibf_decode(struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *ret_id)
Decode and remove an element from the IBF, if possible.
Definition: ibf.c:228
uint8_t order
Order of the whole ibf, where num_buckets = 2^order.
struct IBF_Key ibf_key
IBF key for the entry, derived from the current salt.
Element should be added to the result set of the local peer, i.e.
We sent the strata estimator, and expect an IBF.
A set that supports a specific operation with other peers.
uint32_t accept_id
ID of the to identify the request when accepting or rejecting it.
Definition: setu.h:159
#define IBF_BUCKET_SIZE
Size of one ibf bucket in bytes.
Definition: ibf.h:72
static void handle_client_accept(void *cls, const struct GNUNET_SETU_AcceptMessage *msg)
Handle a request from the client to accept a set operation that came from a remote peer...
struct ElementEntry * element
The actual element associated with the key.
Randomness for IVs etc.
unsigned int GNUNET_CONTAINER_multihashmap32_size(const struct GNUNET_CONTAINER_MultiHashMap32 *map)
Get the number of key-value pairs in the map.
enum GNUNET_GenericReturnValue GNUNET_CRYPTO_kdf(void *result, size_t out_len, const void *xts, size_t xts_len, const void *skm, size_t skm_len,...)
Derive key.
Definition: crypto_kdf.c:90
uint16_t status
See PRISM_STATUS_*-constants.
#define GNUNET_MESSAGE_TYPE_SETU_CANCEL
Cancel a set operation.
static char buf[2048]
static void salt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
Modify an IBF key k_in based on the salt, returning a salted key in k_out.
static void channel_window_cb(void *cls, const struct GNUNET_CADET_Channel *channel, int window_size)
Function called whenever an MQ-channel&#39;s transmission window size changes.
Continuation for multi part IBFs.
int GNUNET_CONTAINER_multihashmap_remove(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, const void *value)
Remove the given key-value pair from the map.
struct Set * set
Set associated with the operation, NULL until the spec has been associated with a set...
In the penultimate phase, we wait until all our demands are satisfied.
uint32_t byzantine_lower_bound
Lower bound for the set size, used only when byzantine mode is enabled.
Definition: setu.h:226
void ibf_subtract(struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2)
Subtract ibf2 from ibf1, storing the result in ibf1.
Definition: ibf.c:356
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF.
unsigned int strata_count
Size of the IBF array in strata.
#define GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE
Maximum size of a context message for set operation requests.
void strata_estimator_insert(struct StrataEstimator *se, struct IBF_Key key)
Add a key to the strata estimator.
size_t strata_estimator_write(const struct StrataEstimator *se, void *buf)
Write the given strata estimator to the buffer.
Internal representation of the hash map.
static struct Operation * get_incoming(uint32_t id)
Get the incoming socket associated with the given id.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE
Request all missing elements from the other peer, based on their sets and the elements we previously ...
A 512-bit hashcode.
uint16_t result_status
Was the evaluation successful? Contains an enum GNUNET_SETU_Status in NBO.
Definition: setu.h:260
Message sent by a listening client to the service to accept performing the operation with the other p...
Definition: setu.h:78
void GNUNET_SERVICE_client_drop(struct GNUNET_SERVICE_Client *c)
Ask the server to disconnect from the given client.
Definition: service.c:2325
uint8_t force_delta
Always use delta operation instead of sending full sets, even it it&#39;s less efficient.
Definition: setu.h:203
static int destroy_key_to_element_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries...
void GNUNET_SETU_element_hash(const struct GNUNET_SETU_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition: setu_api.c:876
Message handler for a specific message type.
static int res
int symmetric
GNUNET_YES to also send back set elements we are sending to the remote peer.
uint8_t byzantine
GNUNET_YES to fail operations where Byzantine faults are suspected
Definition: setu.h:215
#define LOG(kind,...)
struct GNUNET_CONTAINER_MultiHashMap32 * GNUNET_CONTAINER_multihashmap32_create(unsigned int len)
Create a 32-bit key multi hash map.
void GNUNET_CONTAINER_multihashmap32_destroy(struct GNUNET_CONTAINER_MultiHashMap32 *map)
Destroy a 32-bit key hash map.
struct GNUNET_CONTAINER_MultiHashMap32 * key_to_element
Maps unsalted IBF-Keys to elements.
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *mqm)
Discard the message queue message, free all allocated resources.
Definition: mq.c:323
static GstElement * source
Appsrc instance into which we write data for the pipeline.
static void handle_client_create_set(void *cls, const struct GNUNET_SETU_CreateMessage *msg)
Called when a client wants to create a new set.
int force_delta
Always use delta operation instead of sending full sets, even it it&#39;s less efficient.
static int op_get_element_iterator(void *cls, uint32_t key, void *value)
Iterator over the mapping from IBF keys to element entries.
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:36
void ibf_read_slice(const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf)
Read buckets from a buffer into an ibf.
Definition: ibf.c:323
There must only be one value per key; storing a value should fail if a value under the same key alrea...
static void handle_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
struct GNUNET_TESTBED_Peer * peer
The peer associated with this model.
struct GNUNET_HashCode key
The key used in the DHT.
int strata_estimator_read(const void *buf, size_t buf_len, int is_compressed, struct StrataEstimator *se)
Read strata from the buffer into the given strata estimator.
static unsigned int size
Size of the "table".
Definition: peer.c:67
#define GNUNET_MESSAGE_TYPE_SETU_ADD
Add element to set.
struct GNUNET_CADET_Port * open_port
The port we are listening on with CADET.
uint64_t current_size
Current set size.
Definition: setu.h:249
#define GNUNET_MQ_msg_header_extra(mh, esize, type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header and extra space...
Definition: gnunet_mq_lib.h:88
int byzantine
GNUNET_YES to fail operations where Byzantine faults are suspected
void ibf_destroy(struct InvertibleBloomFilter *ibf)
Destroy all resources associated with the invertible bloom filter.
Definition: ibf.c:403
struct ClientState * cs
Client that owns the set.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT
Send a set element, not as response to a demand but because we&#39;re sending the full set...
char * getenv()
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
static int decode_and_send(struct Operation *op)
Decode which elements are missing on each side, and send the appropriate offers and inquiries...
Strata estimator together with the peer&#39;s overall set size.
struct InvertibleBloomFilter * ibf_create(uint32_t size, uint8_t hash_num)
Create an invertible bloom filter.
Definition: ibf.c:79
#define IBF_ALPHA
Number of buckets used in the ibf per estimated difference.
static struct GNUNET_SCHEDULER_Task * timeout_task
Task to be run on timeout.
Definition: gnunet-arm.c:124
Message sent by client to the service to add an element to the set.
Definition: setu.h:275
unsigned int strata_estimator_difference(const struct StrataEstimator *se1, const struct StrataEstimator *se2)
Estimate set difference with two strata estimators, i.e.
static int check_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
int client_done_sent
Did we send the client that we are done?
struct GNUNET_PeerIdentity target_peer
Peer to evaluate the operation with.
Definition: setu.h:192
Operation context used to execute a set operation.
int GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
static int send_offers_iterator(void *cls, uint32_t key, void *value)
Iterator to send elements to a remote peer.
A request for an operation with another client.
Definition: setu.h:148
SetContent stores the actual set elements, which may be shared by multiple generations derived from o...
static void handle_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
static void op_register_element(struct Operation *op, struct ElementEntry *ee, int received)
Insert an element into the union operation&#39;s key-to-element mapping.
static void handle_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Handle offers (of struct GNUNET_HashCodes) and respond with demands (of struct GNUNET_HashCodes).
static void incoming_timeout_cb(void *cls)
Timeout happens iff:
static void incoming_destroy(struct Operation *op)
Destroy an incoming request from a remote peer.
static uint32_t suggest_id
Counter for allocating unique IDs for clients, used to identify incoming operation requests from remo...
uint16_t reserved2
Padding, must be 0.
uint32_t request_id
Id of our set to evaluate, chosen implicitly by the client when it calls GNUNET_SETU_commit().
Definition: setu.h:187
static int check_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
Allow multiple values with the same key.
unsigned int generation
First generation that includes this element.
Handle to a message queue.
Definition: mq.c:85
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF.
static struct GNUNET_CRYPTO_PowSalt salt
Salt for PoW calcualations.
void ibf_insert(struct InvertibleBloomFilter *ibf, struct IBF_Key key)
Insert a key into an IBF.
Definition: ibf.c:167
#define GNUNET_MQ_msg_header(type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
Definition: gnunet_mq_lib.h:76
The identity of the host (wraps the signing key of the peer).
static struct GNUNET_STATISTICS_Handle * _GSS_statistics
Statistics handle.
static void initialize_key_to_element(struct Operation *op)
Initialize the IBF key to element mapping local to this set operation.
struct GNUNET_PeerIdentity peer_id
Identity of the requesting peer.
Definition: setu.h:164
void GNUNET_CADET_receive_done(struct GNUNET_CADET_Channel *channel)
Send an ack on the channel to confirm the processing of a message.
Definition: cadet_api.c:888
static int send_missing_full_elements_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries...
uint8_t force_full
Always send full sets, even if delta operations would be more efficient.
Definition: setu.h:105
struct GNUNET_CADET_Channel * GNUNET_CADET_channel_create(struct GNUNET_CADET_Handle *h, void *channel_cls, const struct GNUNET_PeerIdentity *destination, const struct GNUNET_HashCode *port, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Create a new channel towards a remote peer.
Definition: cadet_api.c:1031
struct GNUNET_CADET_Channel * channel
Channel to the peer.
We are decoding an IBF.
uint32_t salt
Salt used when hashing elements for this IBF.
struct Operation * ops_head
Evaluate operations are held in a linked list.
Message sent by the service to the client to indicate an element that is removed (set intersection) o...
Definition: setu.h:239
configuration data
Definition: configuration.c:84
uint32_t salt
Salt to use for the operation.
struct GNUNET_SERVICE_Client * client
Client this is about.
const char * name
uint32_t client_request_id
ID used to identify an operation between service and client.
int GNUNET_CRYPTO_hash_cmp(const struct GNUNET_HashCode *h1, const struct GNUNET_HashCode *h2)
Compare function for HashCodes, producing a total ordering of all hashcodes.
Definition: crypto_hash.c:201
struct InvertibleBloomFilter * local_ibf
The IBF with the local set&#39;s element.
struct SetContent * content
Content, possibly shared by multiple sets, and thus reference counted.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_IBF
Invertible bloom filter.
Sent to the service by the client in order to cancel a set operation.
Definition: setu.h:299
static int in_shutdown
Are we in shutdown? if GNUNET_YES and the number of clients drops to zero, disconnect from CADET...
uint16_t size
Number of bytes in the buffer pointed to by data.
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
static void handle_client_reject(void *cls, const struct GNUNET_SETU_RejectMessage *msg)
Called when the listening client rejects an operation request by another peer.
#define GNUNET_log(kind,...)
static int check_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Check offer (of struct GNUNET_HashCodes).
Entry in list of pending tasks.
Definition: scheduler.c:134
static void handle_client_listen(void *cls, const struct GNUNET_SETU_ListenMessage *msg)
Called when a client wants to create a new listener.
#define MAX_IBF_ORDER
The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
#define SE_STRATA_COUNT
Number of IBFs in a strata estimator.
static struct Listener * listener_head
Listeners are held in a doubly linked list.
struct GNUNET_SET_Element element
The actual element.
Opaque handle to a port.
Definition: cadet_api.c:79
Phase that receives full set first and then sends elements that are the local peer missing...
struct Listener * prev
Listeners are held in a doubly linked list.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
static int check_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Check a request for a set operation from another peer.
#define GNUNET_MESSAGE_TYPE_SETU_LISTEN
Listen for operation requests.
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
struct GNUNET_HashCode app_id
Application ID for the operation, used to distinguish multiple operations of the same type with the s...
Header for all communications.
void GNUNET_CADET_close_port(struct GNUNET_CADET_Port *p)
Close a port opened with GNUNET_CADET_open_port().
Definition: cadet_api.c:808
static void shutdown_task(void *cls)
Called to clean up, after a shutdown has been requested.
static void handle_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
uint8_t force_delta
Always use delta operation instead of sending full sets, even it it&#39;s less efficient.
Definition: setu.h:99
static unsigned int get_order_from_difference(unsigned int diff)
Compute the necessary order of an ibf from the size of the symmetric set difference.
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:355
#define GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SETU_REJECT
Reject a set request.
static int prepare_ibf(struct Operation *op, uint32_t size)
Create an ibf with the operation&#39;s elements of the specified size.
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SETU_ADD.
Definition: setu.h:280
#define GNUNET_MESSAGE_TYPE_SETU_EVALUATE
Evaluate a set operation.
struct ClientState * cs
Client that owns the listener.
struct Listener * listener
Listener, if associated with the client, otherwise NULL.
uint32_t remote_element_count
Remote peers element count.
struct Operation * op_tail
Tail of DLL of operations this listener is responsible for.
static void * client_connect_cb(void *cls, struct GNUNET_SERVICE_Client *c, struct GNUNET_MQ_Handle *mq)
Callback called when a client connects to the service.
static struct GNUNET_ARM_Operation * op
Current operation.
Definition: gnunet-arm.c:144
static void channel_end_cb(void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
Function called whenever a channel is destroyed.
Keys that can be inserted into and removed from an IBF.
Definition: ibf.h:45
uint32_t salt
Salt used when hashing elements for this inquiry.
uint32_t byzantine_lower_bound
Lower bound for the set size, used only when byzantine mode is enabled.
Definition: setu.h:123
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MulitHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
static struct GNUNET_IDENTITY_EgoLookup * el
EgoLookup.
Definition: gnunet-abd.c:51
struct Set * next
Sets are held in a doubly linked list (in sets_head and sets_tail).
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER
Request all missing elements from the other peer, based on their sets and the elements we previously ...
uint32_t salt_send
Salt that we&#39;re using for sending IBFs.
void GNUNET_CADET_channel_destroy(struct GNUNET_CADET_Channel *channel)
Destroy an existing channel.
Definition: cadet_api.c:837
static void run(void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_SERVICE_Handle *service)
Function called by the service&#39;s run method to run service-specific setup code.
We sent the request message, and expect a strata estimator.
struct Listener * listener
Port this operation runs on.
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
void GNUNET_SERVICE_client_continue(struct GNUNET_SERVICE_Client *c)
Continue receiving further messages from the given client.
Definition: service.c:2244
uint16_t size
Number of bytes in the buffer pointed to by data.
struct GNUNET_MQ_Handle * mq
Message queue for the channel.
static int check_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check an element message from a remote peer.
static void handle_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
Used as a closure for sending elements with a specific IBF key.
struct GNUNET_HashCode hash
FIXME.
Message sent by a listening client to the service to reject performing the operation with the other p...
Definition: setu.h:131
struct IBF_Key ibf_key
The IBF key whose matching elements should be sent.
static void handle_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
#define GNUNET_malloc(size)
Wrapper around malloc.
static int _GSS_is_element_of_operation(struct ElementEntry *ee, struct Operation *op)
Is element ee part of the set used by op?
#define GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY
Tell the other peer to send us a list of hashes that match an IBF key.
uint32_t received_total
Total number of elements received from the other peer.
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:53
static unsigned int force_delta
#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS
Actual set elements.
#define GNUNET_free(ptr)
Wrapper around free.
uint16_t element_type
Application-specific element type.
static void handle_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Handle a request for a set operation from another peer.
struct GNUNET_MessageHeader * GNUNET_copy_message(const struct GNUNET_MessageHeader *msg)
Create a copy of the given message.
uint16_t len
length of data (which is always a uint32_t, but presumably this can be used to specify that fewer byt...
struct GNUNET_MQ_Handle * GNUNET_CADET_get_mq(const struct GNUNET_CADET_Channel *channel)
Obtain the message queue for a connected peer.
Definition: cadet_api.c:1082
int force_full
Always send full sets, even if delta operations would be more efficient.
struct GNUNET_HashCode app_id
application id
Definition: setu.h:70
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:972
Element stored in a set.
static void handle_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
unsigned int ibf_size
Size of each IBF stratum (in bytes)