GNUnet  0.11.x
gnunet-service-setu.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2013-2017, 2020 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
29 #include "ibf.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_cadet_service.h"
36 #include <gcrypt.h>
37 #include "gnunet_setu_service.h"
38 #include "setu.h"
39 
40 #define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
41 
46 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
47 
51 #define SE_STRATA_COUNT 32
52 
56 #define SE_IBF_SIZE 80
57 
61 #define SE_IBF_HASH_NUM 4
62 
66 #define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
67 
73 #define MAX_IBF_ORDER (20)
74 
79 #define IBF_ALPHA 4
80 
81 
86 {
91 
102 
107 
112 
117 
123 
129 
135 
141 };
142 
143 
150 struct ElementEntry
151 {
157 
163 
167  unsigned int generation;
168 
173  int remote;
174 };
175 
176 
181 struct Listener;
182 
183 
187 struct Set;
188 
189 
193 struct ClientState
194 {
198  struct Set *set;
199 
203  struct Listener *listener;
204 
208  struct GNUNET_SERVICE_Client *client;
209 
213  struct GNUNET_MQ_Handle *mq;
214 };
215 
216 
220 struct Operation
221 {
222 
227  struct GNUNET_PeerIdentity peer;
228 
232  uint64_t initial_size;
233 
237  struct Operation *next;
238 
242  struct Operation *prev;
243 
247  struct GNUNET_CADET_Channel *channel;
248 
252  struct Listener *listener;
253 
257  struct GNUNET_MQ_Handle *mq;
258 
262  struct GNUNET_MessageHeader *context_msg;
263 
268  struct Set *set;
269 
275 
280 
285 
292 
298 
303 
308 
312  int client_done_sent;
313 
317  unsigned int ibf_buckets_received;
318 
322  uint32_t salt_send;
323 
327  uint32_t salt_receive;
328 
333  uint32_t received_fresh;
334 
338  uint32_t received_total;
339 
343  uint32_t salt;
344 
348  uint32_t remote_element_count;
349 
353  uint32_t client_request_id;
354 
359  int force_delta;
360 
365  int force_full;
366 
371  int byzantine;
372 
378 
383  int byzantine_lower_bound;
384 
390  uint32_t suggest_id;
391 
396  unsigned int generation_created;
397 };
398 
399 
404 struct SetContent
405 {
409  struct GNUNET_CONTAINER_MultiHashMap *elements;
410 
414  unsigned int refcount;
415 
419  unsigned int latest_generation;
420 
424  int iterator_count;
425 };
426 
427 
431 struct Set
432 {
436  struct Set *next;
437 
441  struct Set *prev;
442 
447  struct ClientState *cs;
448 
453  struct SetContent *content;
454 
460 
464  struct Operation *ops_head;
465 
469  struct Operation *ops_tail;
470 
475  unsigned int current_generation;
476 
477 };
478 
479 
483 struct KeyEntry
484 {
488  struct IBF_Key ibf_key;
489 
496  struct ElementEntry *element;
497 
503  int received;
504 };
505 
506 
511 struct SendElementClosure
512 {
517  struct IBF_Key ibf_key;
518 
523  struct Operation *op;
524 };
525 
526 
531 struct Listener
532 {
536  struct Listener *next;
537 
541  struct Listener *prev;
542 
548  struct Operation *op_head;
549 
555  struct Operation *op_tail;
556 
561  struct ClientState *cs;
562 
566  struct GNUNET_CADET_Port *open_port;
567 
572  struct GNUNET_HashCode app_id;
573 
574 };
575 
576 
581 static struct GNUNET_CADET_Handle *cadet;
582 
587 
591 static struct Listener *listener_head;
592 
596 static struct Listener *listener_tail;
597 
601 static unsigned int num_clients;
602 
607 static int in_shutdown;
608 
614 static uint32_t suggest_id;
615 
616 
627 static int
629  uint32_t key,
630  void *value)
631 {
632  struct KeyEntry *k = value;
633 
634  GNUNET_assert (NULL != k);
635  if (GNUNET_YES == k->element->remote)
636  {
637  GNUNET_free (k->element);
638  k->element = NULL;
639  }
640  GNUNET_free (k);
641  return GNUNET_YES;
642 }
643 
644 
651 static void
652 send_client_done (void *cls)
653 {
654  struct Operation *op = cls;
655  struct GNUNET_MQ_Envelope *ev;
656  struct GNUNET_SETU_ResultMessage *rm;
657 
658  if (GNUNET_YES == op->client_done_sent)
659  return;
660  if (PHASE_DONE != op->phase)
661  {
663  "Union operation failed\n");
664  GNUNET_STATISTICS_update (_GSS_statistics,
665  "# Union operations failed",
666  1,
667  GNUNET_NO);
670  rm->request_id = htonl (op->client_request_id);
671  rm->element_type = htons (0);
672  GNUNET_MQ_send (op->set->cs->mq,
673  ev);
674  return;
675  }
676 
678 
679  GNUNET_STATISTICS_update (_GSS_statistics,
680  "# Union operations succeeded",
681  1,
682  GNUNET_NO);
684  "Signalling client that union operation is done\n");
685  ev = GNUNET_MQ_msg (rm,
687  rm->request_id = htonl (op->client_request_id);
689  rm->element_type = htons (0);
691  op->key_to_element));
692  GNUNET_MQ_send (op->set->cs->mq,
693  ev);
694 }
695 
696 
697 /* FIXME: the destroy logic is a mess and should be cleaned up! */
698 
711 static void
713 {
714  struct Set *set = op->set;
715  struct GNUNET_CADET_Channel *channel;
716 
718  "Destroying union operation %p\n",
719  op);
720  GNUNET_assert (NULL == op->listener);
721  /* check if the op was canceled twice */
722  if (NULL != op->remote_ibf)
723  {
724  ibf_destroy (op->remote_ibf);
725  op->remote_ibf = NULL;
726  }
727  if (NULL != op->demanded_hashes)
728  {
730  op->demanded_hashes = NULL;
731  }
732  if (NULL != op->local_ibf)
733  {
734  ibf_destroy (op->local_ibf);
735  op->local_ibf = NULL;
736  }
737  if (NULL != op->se)
738  {
740  op->se = NULL;
741  }
742  if (NULL != op->key_to_element)
743  {
746  NULL);
748  op->key_to_element = NULL;
749  }
750  if (NULL != set)
751  {
752  GNUNET_CONTAINER_DLL_remove (set->ops_head,
753  set->ops_tail,
754  op);
755  op->set = NULL;
756  }
757  if (NULL != op->context_msg)
758  {
759  GNUNET_free (op->context_msg);
760  op->context_msg = NULL;
761  }
762  if (NULL != (channel = op->channel))
763  {
764  /* This will free op; called conditionally as this helper function
765  is also called from within the channel disconnect handler. */
766  op->channel = NULL;
768  }
769  /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
770  * there was a channel end handler that will free 'op' on the call stack. */
771 }
772 
773 
779 static void
781 
782 
788 static void
790 {
791  struct Listener *listener;
792 
794  "Destroying incoming operation %p\n",
795  op);
796  if (NULL != (listener = op->listener))
797  {
799  listener->op_tail,
800  op);
801  op->listener = NULL;
802  }
803  if (NULL != op->timeout_task)
804  {
806  op->timeout_task = NULL;
807  }
809 }
810 
811 
817 static void
819 {
820  struct GNUNET_CADET_Channel *channel;
821 
822  if (NULL != (channel = op->channel))
823  {
824  /* This will free op; called conditionally as this helper function
825  is also called from within the channel disconnect handler. */
826  op->channel = NULL;
828  }
829  if (NULL != op->listener)
830  {
831  incoming_destroy (op);
832  return;
833  }
834  if (NULL != op->set)
835  send_client_done (op);
837  GNUNET_free (op);
838 }
839 
840 
847 static void
849 {
850  struct GNUNET_MQ_Envelope *ev;
852 
854  "union operation failed\n");
857  msg->request_id = htonl (op->client_request_id);
858  msg->element_type = htons (0);
859  GNUNET_MQ_send (op->set->cs->mq,
860  ev);
862 }
863 
864 
872 static struct IBF_Key
873 get_ibf_key (const struct GNUNET_HashCode *src)
874 {
875  struct IBF_Key key;
876  uint16_t salt = 0;
877 
879  GNUNET_CRYPTO_kdf (&key, sizeof(key),
880  src, sizeof *src,
881  &salt, sizeof(salt),
882  NULL, 0));
883  return key;
884 }
885 
886 
890 struct GetElementContext
891 {
895  struct GNUNET_HashCode hash;
896 
900  struct KeyEntry *k;
901 };
902 
903 
914 static int
916  uint32_t key,
917  void *value)
918 {
919  struct GetElementContext *ctx = cls;
920  struct KeyEntry *k = value;
921 
922  GNUNET_assert (NULL != k);
924  &ctx->hash))
925  {
926  ctx->k = k;
927  return GNUNET_NO;
928  }
929  return GNUNET_YES;
930 }
931 
932 
941 static struct KeyEntry *
943  const struct GNUNET_HashCode *element_hash)
944 {
945  int ret;
946  struct IBF_Key ibf_key;
947  struct GetElementContext ctx = { { { 0 } }, 0 };
948 
949  ctx.hash = *element_hash;
950 
951  ibf_key = get_ibf_key (element_hash);
953  (uint32_t) ibf_key.key_val,
955  &ctx);
956 
957  /* was the iteration aborted because we found the element? */
958  if (GNUNET_SYSERR == ret)
959  {
960  GNUNET_assert (NULL != ctx.k);
961  return ctx.k;
962  }
963  return NULL;
964 }
965 
966 
981 static void
983  struct ElementEntry *ee,
984  int received)
985 {
986  struct IBF_Key ibf_key;
987  struct KeyEntry *k;
988 
989  ibf_key = get_ibf_key (&ee->element_hash);
990  k = GNUNET_new (struct KeyEntry);
991  k->element = ee;
992  k->ibf_key = ibf_key;
993  k->received = received;
996  (uint32_t) ibf_key.key_val,
997  k,
999 }
1000 
1001 
1005 static void
1006 salt_key (const struct IBF_Key *k_in,
1007  uint32_t salt,
1008  struct IBF_Key *k_out)
1009 {
1010  int s = salt % 64;
1011  uint64_t x = k_in->key_val;
1012 
1013  /* rotate ibf key */
1014  x = (x >> s) | (x << (64 - s));
1015  k_out->key_val = x;
1016 }
1017 
1018 
1022 static void
1023 unsalt_key (const struct IBF_Key *k_in,
1024  uint32_t salt,
1025  struct IBF_Key *k_out)
1026 {
1027  int s = salt % 64;
1028  uint64_t x = k_in->key_val;
1029 
1030  x = (x << s) | (x >> (64 - s));
1031  k_out->key_val = x;
1032 }
1033 
1034 
1042 static int
1044  uint32_t key,
1045  void *value)
1046 {
1047  struct Operation *op = cls;
1048  struct KeyEntry *ke = value;
1049  struct IBF_Key salted_key;
1050 
1052  "[OP %p] inserting %lx (hash %s) into ibf\n",
1053  op,
1054  (unsigned long) ke->ibf_key.key_val,
1055  GNUNET_h2s (&ke->element->element_hash));
1056  salt_key (&ke->ibf_key,
1057  op->salt_send,
1058  &salted_key);
1059  ibf_insert (op->local_ibf, salted_key);
1060  return GNUNET_YES;
1061 }
1062 
1063 
1071 static int
1073  struct Operation *op)
1074 {
1075  return ee->generation >= op->generation_created;
1076 }
1077 
1078 
1089 static int
1091  const struct GNUNET_HashCode *key,
1092  void *value)
1093 {
1094  struct Operation *op = cls;
1095  struct ElementEntry *ee = value;
1096 
1097  /* make sure that the element belongs to the set at the time
1098  * of creating the operation */
1099  if (GNUNET_NO ==
1101  op))
1102  return GNUNET_YES;
1103  GNUNET_assert (GNUNET_NO == ee->remote);
1104  op_register_element (op,
1105  ee,
1106  GNUNET_NO);
1107  return GNUNET_YES;
1108 }
1109 
1110 
1116 static void
1118 {
1119  unsigned int len;
1120 
1121  GNUNET_assert (NULL == op->key_to_element);
1126  op);
1127 }
1128 
1129 
1138 static int
1140  uint32_t size)
1141 {
1142  GNUNET_assert (NULL != op->key_to_element);
1143 
1144  if (NULL != op->local_ibf)
1145  ibf_destroy (op->local_ibf);
1146  op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
1147  if (NULL == op->local_ibf)
1148  {
1150  "Failed to allocate local IBF\n");
1151  return GNUNET_SYSERR;
1152  }
1155  op);
1156  return GNUNET_OK;
1157 }
1158 
1159 
1169 static int
1170 send_ibf (struct Operation *op,
1171  uint16_t ibf_order)
1172 {
1173  unsigned int buckets_sent = 0;
1174  struct InvertibleBloomFilter *ibf;
1175 
1176  if (GNUNET_OK !=
1177  prepare_ibf (op, 1 << ibf_order))
1178  {
1179  /* allocation failed */
1180  return GNUNET_SYSERR;
1181  }
1182 
1184  "sending ibf of size %u\n",
1185  1 << ibf_order);
1186 
1187  {
1188  char name[64] = { 0 };
1189  snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order);
1190  GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
1191  }
1192 
1193  ibf = op->local_ibf;
1194 
1195  while (buckets_sent < (1 << ibf_order))
1196  {
1197  unsigned int buckets_in_message;
1198  struct GNUNET_MQ_Envelope *ev;
1199  struct IBFMessage *msg;
1200 
1201  buckets_in_message = (1 << ibf_order) - buckets_sent;
1202  /* limit to maximum */
1203  if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
1204  buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
1205 
1206  ev = GNUNET_MQ_msg_extra (msg,
1207  buckets_in_message * IBF_BUCKET_SIZE,
1209  msg->reserved1 = 0;
1210  msg->reserved2 = 0;
1211  msg->order = ibf_order;
1212  msg->offset = htonl (buckets_sent);
1213  msg->salt = htonl (op->salt_send);
1214  ibf_write_slice (ibf, buckets_sent,
1215  buckets_in_message, &msg[1]);
1216  buckets_sent += buckets_in_message;
1218  "ibf chunk size %u, %u/%u sent\n",
1219  buckets_in_message,
1220  buckets_sent,
1221  1 << ibf_order);
1222  GNUNET_MQ_send (op->mq, ev);
1223  }
1224 
1225  /* The other peer must decode the IBF, so
1226  * we're passive. */
1228  return GNUNET_OK;
1229 }
1230 
1231 
1239 static unsigned int
1240 get_order_from_difference (unsigned int diff)
1241 {
1242  unsigned int ibf_order;
1243 
1244  ibf_order = 2;
1245  while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
1246  ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
1247  (ibf_order < MAX_IBF_ORDER))
1248  ibf_order++;
1249  // add one for correction
1250  return ibf_order + 1;
1251 }
1252 
1253 
1263 static int
1265  const struct GNUNET_HashCode *key,
1266  void *value)
1267 {
1268  struct Operation *op = cls;
1269  struct GNUNET_SETU_ElementMessage *emsg;
1270  struct ElementEntry *ee = value;
1271  struct GNUNET_SETU_Element *el = &ee->element;
1272  struct GNUNET_MQ_Envelope *ev;
1273 
1275  "Sending element %s\n",
1276  GNUNET_h2s (key));
1277  ev = GNUNET_MQ_msg_extra (emsg,
1278  el->size,
1280  emsg->element_type = htons (el->element_type);
1281  GNUNET_memcpy (&emsg[1],
1282  el->data,
1283  el->size);
1284  GNUNET_MQ_send (op->mq,
1285  ev);
1286  return GNUNET_YES;
1287 }
1288 
1289 
1295 static void
1297 {
1298  struct GNUNET_MQ_Envelope *ev;
1299 
1300  op->phase = PHASE_FULL_SENDING;
1302  "Dedicing to transmit the full set\n");
1303  /* FIXME: use a more memory-friendly way of doing this with an
1304  iterator, just as we do in the non-full case! */
1307  op);
1309  GNUNET_MQ_send (op->mq,
1310  ev);
1311 }
1312 
1313 
1320 static int
1322  const struct StrataEstimatorMessage *msg)
1323 {
1324  struct Operation *op = cls;
1325  int is_compressed;
1326  size_t len;
1327 
1328  if (op->phase != PHASE_EXPECT_SE)
1329  {
1330  GNUNET_break (0);
1331  return GNUNET_SYSERR;
1332  }
1333  is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
1334  msg->header.type));
1335  len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1336  if ((GNUNET_NO == is_compressed) &&
1338  {
1339  GNUNET_break (0);
1340  return GNUNET_SYSERR;
1341  }
1342  return GNUNET_OK;
1343 }
1344 
1345 
1352 static void
1354  const struct StrataEstimatorMessage *msg)
1355 {
1356  struct Operation *op = cls;
1357  struct StrataEstimator *remote_se;
1358  unsigned int diff;
1359  uint64_t other_size;
1360  size_t len;
1361  int is_compressed;
1362 
1363  is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
1364  msg->header.type));
1365  GNUNET_STATISTICS_update (_GSS_statistics,
1366  "# bytes of SE received",
1367  ntohs (msg->header.size),
1368  GNUNET_NO);
1369  len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1370  other_size = GNUNET_ntohll (msg->set_size);
1372  SE_IBF_SIZE,
1373  SE_IBF_HASH_NUM);
1374  if (NULL == remote_se)
1375  {
1376  /* insufficient resources, fail */
1377  fail_union_operation (op);
1378  return;
1379  }
1380  if (GNUNET_OK !=
1381  strata_estimator_read (&msg[1],
1382  len,
1383  is_compressed,
1384  remote_se))
1385  {
1386  /* decompression failed */
1387  strata_estimator_destroy (remote_se);
1388  fail_union_operation (op);
1389  return;
1390  }
1391  GNUNET_assert (NULL != op->se);
1392  diff = strata_estimator_difference (remote_se,
1393  op->se);
1394 
1395  if (diff > 200)
1396  diff = diff * 3 / 2;
1397 
1398  strata_estimator_destroy (remote_se);
1400  op->se = NULL;
1402  "got se diff=%d, using ibf size %d\n",
1403  diff,
1404  1U << get_order_from_difference (diff));
1405 
1406  {
1407  char *set_debug;
1408 
1409  set_debug = getenv ("GNUNET_SETU_BENCHMARK");
1410  if ((NULL != set_debug) &&
1411  (0 == strcmp (set_debug, "1")))
1412  {
1413  FILE *f = fopen ("set.log", "a");
1414  fprintf (f, "%llu\n", (unsigned long long) diff);
1415  fclose (f);
1416  }
1417  }
1418 
1419  if ((GNUNET_YES == op->byzantine) &&
1420  (other_size < op->byzantine_lower_bound))
1421  {
1422  GNUNET_break (0);
1423  fail_union_operation (op);
1424  return;
1425  }
1426 
1427  if ((GNUNET_YES == op->force_full) ||
1428  (diff > op->initial_size / 4) ||
1429  (0 == other_size))
1430  {
1432  "Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
1433  diff,
1434  (unsigned long long) op->initial_size);
1435  GNUNET_STATISTICS_update (_GSS_statistics,
1436  "# of full sends",
1437  1,
1438  GNUNET_NO);
1439  if ((op->initial_size <= other_size) ||
1440  (0 == other_size))
1441  {
1442  send_full_set (op);
1443  }
1444  else
1445  {
1446  struct GNUNET_MQ_Envelope *ev;
1447 
1449  "Telling other peer that we expect its full set\n");
1450  op->phase = PHASE_EXPECT_IBF;
1451  ev = GNUNET_MQ_msg_header (
1453  GNUNET_MQ_send (op->mq,
1454  ev);
1455  }
1456  }
1457  else
1458  {
1459  GNUNET_STATISTICS_update (_GSS_statistics,
1460  "# of ibf sends",
1461  1,
1462  GNUNET_NO);
1463  if (GNUNET_OK !=
1464  send_ibf (op,
1465  get_order_from_difference (diff)))
1466  {
1467  /* Internal error, best we can do is shut the connection */
1469  "Failed to send IBF, closing connection\n");
1470  fail_union_operation (op);
1471  return;
1472  }
1473  }
1475 }
1476 
1477 
1485 static int
1487  uint32_t key,
1488  void *value)
1489 {
1490  struct SendElementClosure *sec = cls;
1491  struct Operation *op = sec->op;
1492  struct KeyEntry *ke = value;
1493  struct GNUNET_MQ_Envelope *ev;
1494  struct GNUNET_MessageHeader *mh;
1495 
1496  /* Detect 32-bit key collision for the 64-bit IBF keys. */
1497  if (ke->ibf_key.key_val != sec->ibf_key.key_val)
1498  return GNUNET_YES;
1499 
1500  ev = GNUNET_MQ_msg_header_extra (mh,
1501  sizeof(struct GNUNET_HashCode),
1503 
1504  GNUNET_assert (NULL != ev);
1505  *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1507  "[OP %p] sending element offer (%s) to peer\n",
1508  op,
1509  GNUNET_h2s (&ke->element->element_hash));
1510  GNUNET_MQ_send (op->mq, ev);
1511  return GNUNET_YES;
1512 }
1513 
1514 
1521 static void
1523  struct IBF_Key ibf_key)
1524 {
1525  struct SendElementClosure send_cls;
1526 
1527  send_cls.ibf_key = ibf_key;
1528  send_cls.op = op;
1530  op->key_to_element,
1531  (uint32_t) ibf_key.
1532  key_val,
1534  &send_cls);
1535 }
1536 
1537 
1545 static int
1547 {
1548  struct IBF_Key key;
1549  struct IBF_Key last_key;
1550  int side;
1551  unsigned int num_decoded;
1552  struct InvertibleBloomFilter *diff_ibf;
1553 
1555 
1556  if (GNUNET_OK !=
1557  prepare_ibf (op,
1558  op->remote_ibf->size))
1559  {
1560  GNUNET_break (0);
1561  /* allocation failed */
1562  return GNUNET_SYSERR;
1563  }
1564  diff_ibf = ibf_dup (op->local_ibf);
1565  ibf_subtract (diff_ibf,
1566  op->remote_ibf);
1567 
1568  ibf_destroy (op->remote_ibf);
1569  op->remote_ibf = NULL;
1570 
1572  "decoding IBF (size=%u)\n",
1573  diff_ibf->size);
1574 
1575  num_decoded = 0;
1576  key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1577 
1578  while (1)
1579  {
1580  int res;
1581  int cycle_detected = GNUNET_NO;
1582 
1583  last_key = key;
1584 
1585  res = ibf_decode (diff_ibf,
1586  &side,
1587  &key);
1588  if (res == GNUNET_OK)
1589  {
1591  "decoded ibf key %lx\n",
1592  (unsigned long) key.key_val);
1593  num_decoded += 1;
1594  if ((num_decoded > diff_ibf->size) ||
1595  ((num_decoded > 1) &&
1596  (last_key.key_val == key.key_val)))
1597  {
1599  "detected cyclic ibf (decoded %u/%u)\n",
1600  num_decoded,
1601  diff_ibf->size);
1602  cycle_detected = GNUNET_YES;
1603  }
1604  }
1605  if ((GNUNET_SYSERR == res) ||
1606  (GNUNET_YES == cycle_detected))
1607  {
1608  int next_order;
1609  next_order = 0;
1610  while (1 << next_order < diff_ibf->size)
1611  next_order++;
1612  next_order++;
1613  if (next_order <= MAX_IBF_ORDER)
1614  {
1616  "decoding failed, sending larger ibf (size %u)\n",
1617  1 << next_order);
1618  GNUNET_STATISTICS_update (_GSS_statistics,
1619  "# of IBF retries",
1620  1,
1621  GNUNET_NO);
1622  op->salt_send++;
1623  if (GNUNET_OK !=
1624  send_ibf (op, next_order))
1625  {
1626  /* Internal error, best we can do is shut the connection */
1628  "Failed to send IBF, closing connection\n");
1629  fail_union_operation (op);
1630  ibf_destroy (diff_ibf);
1631  return GNUNET_SYSERR;
1632  }
1633  }
1634  else
1635  {
1636  GNUNET_STATISTICS_update (_GSS_statistics,
1637  "# of failed union operations (too large)",
1638  1,
1639  GNUNET_NO);
1640  // XXX: Send the whole set, element-by-element
1642  "set union failed: reached ibf limit\n");
1643  fail_union_operation (op);
1644  ibf_destroy (diff_ibf);
1645  return GNUNET_SYSERR;
1646  }
1647  break;
1648  }
1649  if (GNUNET_NO == res)
1650  {
1651  struct GNUNET_MQ_Envelope *ev;
1652 
1654  "transmitted all values, sending DONE\n");
1656  GNUNET_MQ_send (op->mq, ev);
1657  /* We now wait until we get a DONE message back
1658  * and then wait for our MQ to be flushed and all our
1659  * demands be delivered. */
1660  break;
1661  }
1662  if (1 == side)
1663  {
1664  struct IBF_Key unsalted_key;
1665 
1666  unsalt_key (&key,
1667  op->salt_receive,
1668  &unsalted_key);
1669  send_offers_for_key (op,
1670  unsalted_key);
1671  }
1672  else if (-1 == side)
1673  {
1674  struct GNUNET_MQ_Envelope *ev;
1675  struct InquiryMessage *msg;
1676 
1677  /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1678  * the effort additional complexity. */
1679  ev = GNUNET_MQ_msg_extra (msg,
1680  sizeof(struct IBF_Key),
1682  msg->salt = htonl (op->salt_receive);
1683  GNUNET_memcpy (&msg[1],
1684  &key,
1685  sizeof(struct IBF_Key));
1687  "sending element inquiry for IBF key %lx\n",
1688  (unsigned long) key.key_val);
1689  GNUNET_MQ_send (op->mq, ev);
1690  }
1691  else
1692  {
1693  GNUNET_assert (0);
1694  }
1695  }
1696  ibf_destroy (diff_ibf);
1697  return GNUNET_OK;
1698 }
1699 
1700 
1711 static int
1713  const struct IBFMessage *msg)
1714 {
1715  struct Operation *op = cls;
1716  unsigned int buckets_in_message;
1717 
1718  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1719  / IBF_BUCKET_SIZE;
1720  if (0 == buckets_in_message)
1721  {
1722  GNUNET_break_op (0);
1723  return GNUNET_SYSERR;
1724  }
1725  if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
1726  * IBF_BUCKET_SIZE)
1727  {
1728  GNUNET_break_op (0);
1729  return GNUNET_SYSERR;
1730  }
1731  if (op->phase == PHASE_EXPECT_IBF_CONT)
1732  {
1733  if (ntohl (msg->offset) != op->ibf_buckets_received)
1734  {
1735  GNUNET_break_op (0);
1736  return GNUNET_SYSERR;
1737  }
1738  if (1 << msg->order != op->remote_ibf->size)
1739  {
1740  GNUNET_break_op (0);
1741  return GNUNET_SYSERR;
1742  }
1743  if (ntohl (msg->salt) != op->salt_receive)
1744  {
1745  GNUNET_break_op (0);
1746  return GNUNET_SYSERR;
1747  }
1748  }
1749  else if ((op->phase != PHASE_INVENTORY_PASSIVE) &&
1750  (op->phase != PHASE_EXPECT_IBF))
1751  {
1752  GNUNET_break_op (0);
1753  return GNUNET_SYSERR;
1754  }
1755 
1756  return GNUNET_OK;
1757 }
1758 
1759 
1769 static void
1771  const struct IBFMessage *msg)
1772 {
1773  struct Operation *op = cls;
1774  unsigned int buckets_in_message;
1775 
1776  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1777  / IBF_BUCKET_SIZE;
1778  if ((op->phase == PHASE_INVENTORY_PASSIVE) ||
1779  (op->phase == PHASE_EXPECT_IBF))
1780  {
1782  GNUNET_assert (NULL == op->remote_ibf);
1784  "Creating new ibf of size %u\n",
1785  1 << msg->order);
1786  op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
1787  op->salt_receive = ntohl (msg->salt);
1789  "Receiving new IBF with salt %u\n",
1790  op->salt_receive);
1791  if (NULL == op->remote_ibf)
1792  {
1794  "Failed to parse remote IBF, closing connection\n");
1795  fail_union_operation (op);
1796  return;
1797  }
1798  op->ibf_buckets_received = 0;
1799  if (0 != ntohl (msg->offset))
1800  {
1801  GNUNET_break_op (0);
1802  fail_union_operation (op);
1803  return;
1804  }
1805  }
1806  else
1807  {
1810  "Received more of IBF\n");
1811  }
1812  GNUNET_assert (NULL != op->remote_ibf);
1813 
1814  ibf_read_slice (&msg[1],
1816  buckets_in_message,
1817  op->remote_ibf);
1818  op->ibf_buckets_received += buckets_in_message;
1819 
1820  if (op->ibf_buckets_received == op->remote_ibf->size)
1821  {
1823  "received full ibf\n");
1825  if (GNUNET_OK !=
1826  decode_and_send (op))
1827  {
1828  /* Internal error, best we can do is shut down */
1830  "Failed to decode IBF, closing connection\n");
1831  fail_union_operation (op);
1832  return;
1833  }
1834  }
1836 }
1837 
1838 
1847 static void
1849  const struct GNUNET_SETU_Element *element,
1851 {
1852  struct GNUNET_MQ_Envelope *ev;
1853  struct GNUNET_SETU_ResultMessage *rm;
1854 
1856  "sending element (size %u) to client\n",
1857  element->size);
1858  GNUNET_assert (0 != op->client_request_id);
1859  ev = GNUNET_MQ_msg_extra (rm,
1860  element->size,
1862  if (NULL == ev)
1863  {
1864  GNUNET_MQ_discard (ev);
1865  GNUNET_break (0);
1866  return;
1867  }
1868  rm->result_status = htons (status);
1869  rm->request_id = htonl (op->client_request_id);
1870  rm->element_type = htons (element->element_type);
1872  op->key_to_element));
1873  GNUNET_memcpy (&rm[1],
1874  element->data,
1875  element->size);
1876  GNUNET_MQ_send (op->set->cs->mq,
1877  ev);
1878 }
1879 
1880 
1886 static void
1888 {
1889  unsigned int num_demanded;
1890 
1891  num_demanded = GNUNET_CONTAINER_multihashmap_size (
1892  op->demanded_hashes);
1893 
1894  if (PHASE_FINISH_WAITING == op->phase)
1895  {
1897  "In PHASE_FINISH_WAITING, pending %u demands\n",
1898  num_demanded);
1899  if (0 == num_demanded)
1900  {
1901  struct GNUNET_MQ_Envelope *ev;
1902 
1903  op->phase = PHASE_DONE;
1905  GNUNET_MQ_send (op->mq,
1906  ev);
1907  /* We now wait until the other peer sends P2P_OVER
1908  * after it got all elements from us. */
1909  }
1910  }
1911  if (PHASE_FINISH_CLOSING == op->phase)
1912  {
1914  "In PHASE_FINISH_CLOSING, pending %u demands\n",
1915  num_demanded);
1916  if (0 == num_demanded)
1917  {
1918  op->phase = PHASE_DONE;
1919  send_client_done (op);
1921  }
1922  }
1923 }
1924 
1925 
1932 static int
1934  const struct GNUNET_SETU_ElementMessage *emsg)
1935 {
1936  struct Operation *op = cls;
1937 
1939  {
1940  GNUNET_break_op (0);
1941  return GNUNET_SYSERR;
1942  }
1943  return GNUNET_OK;
1944 }
1945 
1946 
1955 static void
1957  const struct GNUNET_SETU_ElementMessage *emsg)
1958 {
1959  struct Operation *op = cls;
1960  struct ElementEntry *ee;
1961  struct KeyEntry *ke;
1962  uint16_t element_size;
1963 
1964  element_size = ntohs (emsg->header.size) - sizeof(struct
1966  ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
1967  GNUNET_memcpy (&ee[1],
1968  &emsg[1],
1969  element_size);
1970  ee->element.size = element_size;
1971  ee->element.data = &ee[1];
1972  ee->element.element_type = ntohs (emsg->element_type);
1973  ee->remote = GNUNET_YES;
1975  &ee->element_hash);
1976  if (GNUNET_NO ==
1978  &ee->element_hash,
1979  NULL))
1980  {
1981  /* We got something we didn't demand, since it's not in our map. */
1982  GNUNET_break_op (0);
1983  fail_union_operation (op);
1984  return;
1985  }
1986 
1988  "Got element (size %u, hash %s) from peer\n",
1989  (unsigned int) element_size,
1990  GNUNET_h2s (&ee->element_hash));
1991 
1992  GNUNET_STATISTICS_update (_GSS_statistics,
1993  "# received elements",
1994  1,
1995  GNUNET_NO);
1996  GNUNET_STATISTICS_update (_GSS_statistics,
1997  "# exchanged elements",
1998  1,
1999  GNUNET_NO);
2000 
2001  op->received_total++;
2002 
2003  ke = op_get_element (op,
2004  &ee->element_hash);
2005  if (NULL != ke)
2006  {
2007  /* Got repeated element. Should not happen since
2008  * we track demands. */
2009  GNUNET_STATISTICS_update (_GSS_statistics,
2010  "# repeated elements",
2011  1,
2012  GNUNET_NO);
2013  ke->received = GNUNET_YES;
2014  GNUNET_free (ee);
2015  }
2016  else
2017  {
2019  "Registering new element from remote peer\n");
2020  op->received_fresh++;
2021  op_register_element (op, ee, GNUNET_YES);
2022  /* only send results immediately if the client wants it */
2023  send_client_element (op,
2024  &ee->element,
2026  }
2027 
2028  if ((op->received_total > 8) &&
2029  (op->received_fresh < op->received_total / 3))
2030  {
2031  /* The other peer gave us lots of old elements, there's something wrong. */
2032  GNUNET_break_op (0);
2033  fail_union_operation (op);
2034  return;
2035  }
2037  maybe_finish (op);
2038 }
2039 
2040 
2047 static int
2049  const struct GNUNET_SETU_ElementMessage *emsg)
2050 {
2051  struct Operation *op = cls;
2052 
2053  (void) op;
2054  // FIXME: check that we expect full elements here?
2055  return GNUNET_OK;
2056 }
2057 
2058 
2065 static void
2067  const struct GNUNET_SETU_ElementMessage *emsg)
2068 {
2069  struct Operation *op = cls;
2070  struct ElementEntry *ee;
2071  struct KeyEntry *ke;
2072  uint16_t element_size;
2073 
2074  element_size = ntohs (emsg->header.size)
2075  - sizeof(struct GNUNET_SETU_ElementMessage);
2076  ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
2077  GNUNET_memcpy (&ee[1], &emsg[1], element_size);
2078  ee->element.size = element_size;
2079  ee->element.data = &ee[1];
2080  ee->element.element_type = ntohs (emsg->element_type);
2081  ee->remote = GNUNET_YES;
2083  &ee->element_hash);
2085  "Got element (full diff, size %u, hash %s) from peer\n",
2086  (unsigned int) element_size,
2087  GNUNET_h2s (&ee->element_hash));
2088 
2089  GNUNET_STATISTICS_update (_GSS_statistics,
2090  "# received elements",
2091  1,
2092  GNUNET_NO);
2093  GNUNET_STATISTICS_update (_GSS_statistics,
2094  "# exchanged elements",
2095  1,
2096  GNUNET_NO);
2097 
2098  op->received_total++;
2099 
2100  ke = op_get_element (op,
2101  &ee->element_hash);
2102  if (NULL != ke)
2103  {
2104  /* Got repeated element. Should not happen since
2105  * we track demands. */
2106  GNUNET_STATISTICS_update (_GSS_statistics,
2107  "# repeated elements",
2108  1,
2109  GNUNET_NO);
2110  ke->received = GNUNET_YES;
2111  GNUNET_free (ee);
2112  }
2113  else
2114  {
2116  "Registering new element from remote peer\n");
2117  op->received_fresh++;
2118  op_register_element (op, ee, GNUNET_YES);
2119  /* only send results immediately if the client wants it */
2120  send_client_element (op,
2121  &ee->element,
2123  }
2124 
2125  if ((GNUNET_YES == op->byzantine) &&
2126  (op->received_total > 384 + op->received_fresh * 4) &&
2127  (op->received_fresh < op->received_total / 6))
2128  {
2129  /* The other peer gave us lots of old elements, there's something wrong. */
2131  "Other peer sent only %llu/%llu fresh elements, failing operation\n",
2132  (unsigned long long) op->received_fresh,
2133  (unsigned long long) op->received_total);
2134  GNUNET_break_op (0);
2135  fail_union_operation (op);
2136  return;
2137  }
2139 }
2140 
2141 
2149 static int
2151  const struct InquiryMessage *msg)
2152 {
2153  struct Operation *op = cls;
2154  unsigned int num_keys;
2155 
2156  if (op->phase != PHASE_INVENTORY_PASSIVE)
2157  {
2158  GNUNET_break_op (0);
2159  return GNUNET_SYSERR;
2160  }
2161  num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2162  / sizeof(struct IBF_Key);
2163  if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2164  != num_keys * sizeof(struct IBF_Key))
2165  {
2166  GNUNET_break_op (0);
2167  return GNUNET_SYSERR;
2168  }
2169  return GNUNET_OK;
2170 }
2171 
2172 
2179 static void
2181  const struct InquiryMessage *msg)
2182 {
2183  struct Operation *op = cls;
2184  const struct IBF_Key *ibf_key;
2185  unsigned int num_keys;
2186 
2188  "Received union inquiry\n");
2189  num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2190  / sizeof(struct IBF_Key);
2191  ibf_key = (const struct IBF_Key *) &msg[1];
2192  while (0 != num_keys--)
2193  {
2194  struct IBF_Key unsalted_key;
2195 
2196  unsalt_key (ibf_key,
2197  ntohl (msg->salt),
2198  &unsalted_key);
2199  send_offers_for_key (op,
2200  unsalted_key);
2201  ibf_key++;
2202  }
2204 }
2205 
2206 
2217 static int
2219  uint32_t key,
2220  void *value)
2221 {
2222  struct Operation *op = cls;
2223  struct KeyEntry *ke = value;
2224  struct GNUNET_MQ_Envelope *ev;
2225  struct GNUNET_SETU_ElementMessage *emsg;
2226  struct ElementEntry *ee = ke->element;
2227 
2228  if (GNUNET_YES == ke->received)
2229  return GNUNET_YES;
2230  ev = GNUNET_MQ_msg_extra (emsg,
2231  ee->element.size,
2233  GNUNET_memcpy (&emsg[1],
2234  ee->element.data,
2235  ee->element.size);
2236  emsg->element_type = htons (ee->element.element_type);
2237  GNUNET_MQ_send (op->mq,
2238  ev);
2239  return GNUNET_YES;
2240 }
2241 
2242 
2249 static void
2251  const struct GNUNET_MessageHeader *mh)
2252 {
2253  struct Operation *op = cls;
2254 
2256  "Received request for full set transmission\n");
2257  if (PHASE_EXPECT_IBF != op->phase)
2258  {
2259  GNUNET_break_op (0);
2260  fail_union_operation (op);
2261  return;
2262  }
2263 
2264  // FIXME: we need to check that our set is larger than the
2265  // byzantine_lower_bound by some threshold
2266  send_full_set (op);
2268 }
2269 
2270 
2277 static void
2279  const struct GNUNET_MessageHeader *mh)
2280 {
2281  struct Operation *op = cls;
2282 
2283  switch (op->phase)
2284  {
2285  case PHASE_EXPECT_IBF:
2286  {
2287  struct GNUNET_MQ_Envelope *ev;
2288 
2290  "got FULL DONE, sending elements that other peer is missing\n");
2291 
2292  /* send all the elements that did not come from the remote peer */
2295  op);
2297  GNUNET_MQ_send (op->mq,
2298  ev);
2299  op->phase = PHASE_DONE;
2300  /* we now wait until the other peer sends us the OVER message*/
2301  }
2302  break;
2303 
2304  case PHASE_FULL_SENDING:
2305  {
2307  "got FULL DONE, finishing\n");
2308  /* We sent the full set, and got the response for that. We're done. */
2309  op->phase = PHASE_DONE;
2311  send_client_done (op);
2313  return;
2314  }
2315  break;
2316 
2317  default:
2319  "Handle full done phase is %u\n",
2320  (unsigned) op->phase);
2321  GNUNET_break_op (0);
2322  fail_union_operation (op);
2323  return;
2324  }
2326 }
2327 
2328 
2337 static int
2339  const struct GNUNET_MessageHeader *mh)
2340 {
2341  struct Operation *op = cls;
2342  unsigned int num_hashes;
2343 
2344  (void) op;
2345  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2346  / sizeof(struct GNUNET_HashCode);
2347  if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2348  != num_hashes * sizeof(struct GNUNET_HashCode))
2349  {
2350  GNUNET_break_op (0);
2351  return GNUNET_SYSERR;
2352  }
2353  return GNUNET_OK;
2354 }
2355 
2356 
2364 static void
2366  const struct GNUNET_MessageHeader *mh)
2367 {
2368  struct Operation *op = cls;
2369  struct ElementEntry *ee;
2370  struct GNUNET_SETU_ElementMessage *emsg;
2371  const struct GNUNET_HashCode *hash;
2372  unsigned int num_hashes;
2373  struct GNUNET_MQ_Envelope *ev;
2374 
2375  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2376  / sizeof(struct GNUNET_HashCode);
2377  for (hash = (const struct GNUNET_HashCode *) &mh[1];
2378  num_hashes > 0;
2379  hash++, num_hashes--)
2380  {
2382  hash);
2383  if (NULL == ee)
2384  {
2385  /* Demand for non-existing element. */
2386  GNUNET_break_op (0);
2387  fail_union_operation (op);
2388  return;
2389  }
2390  if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
2391  {
2392  /* Probably confused lazily copied sets. */
2393  GNUNET_break_op (0);
2394  fail_union_operation (op);
2395  return;
2396  }
2397  ev = GNUNET_MQ_msg_extra (emsg,
2398  ee->element.size,
2400  GNUNET_memcpy (&emsg[1],
2401  ee->element.data,
2402  ee->element.size);
2403  emsg->reserved = htons (0);
2404  emsg->element_type = htons (ee->element.element_type);
2406  "[OP %p] Sending demanded element (size %u, hash %s) to peer\n",
2407  op,
2408  (unsigned int) ee->element.size,
2409  GNUNET_h2s (&ee->element_hash));
2410  GNUNET_MQ_send (op->mq, ev);
2411  GNUNET_STATISTICS_update (_GSS_statistics,
2412  "# exchanged elements",
2413  1,
2414  GNUNET_NO);
2415  if (op->symmetric)
2416  send_client_element (op,
2417  &ee->element,
2419  }
2421 }
2422 
2423 
2431 static int
2433  const struct GNUNET_MessageHeader *mh)
2434 {
2435  struct Operation *op = cls;
2436  unsigned int num_hashes;
2437 
2438  /* look up elements and send them */
2439  if ((op->phase != PHASE_INVENTORY_PASSIVE) &&
2440  (op->phase != PHASE_INVENTORY_ACTIVE))
2441  {
2442  GNUNET_break_op (0);
2443  return GNUNET_SYSERR;
2444  }
2445  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2446  / sizeof(struct GNUNET_HashCode);
2447  if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
2448  num_hashes * sizeof(struct GNUNET_HashCode))
2449  {
2450  GNUNET_break_op (0);
2451  return GNUNET_SYSERR;
2452  }
2453  return GNUNET_OK;
2454 }
2455 
2456 
2464 static void
2466  const struct GNUNET_MessageHeader *mh)
2467 {
2468  struct Operation *op = cls;
2469  const struct GNUNET_HashCode *hash;
2470  unsigned int num_hashes;
2471 
2472  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2473  / sizeof(struct GNUNET_HashCode);
2474  for (hash = (const struct GNUNET_HashCode *) &mh[1];
2475  num_hashes > 0;
2476  hash++, num_hashes--)
2477  {
2478  struct ElementEntry *ee;
2479  struct GNUNET_MessageHeader *demands;
2480  struct GNUNET_MQ_Envelope *ev;
2481 
2483  hash);
2484  if (NULL != ee)
2485  if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2486  continue;
2487 
2488  if (GNUNET_YES ==
2490  hash))
2491  {
2493  "Skipped sending duplicate demand\n");
2494  continue;
2495  }
2496 
2499  op->demanded_hashes,
2500  hash,
2501  NULL,
2503 
2505  "[OP %p] Requesting element (hash %s)\n",
2506  op, GNUNET_h2s (hash));
2507  ev = GNUNET_MQ_msg_header_extra (demands,
2508  sizeof(struct GNUNET_HashCode),
2510  GNUNET_memcpy (&demands[1],
2511  hash,
2512  sizeof(struct GNUNET_HashCode));
2513  GNUNET_MQ_send (op->mq, ev);
2514  }
2516 }
2517 
2518 
2525 static void
2527  const struct GNUNET_MessageHeader *mh)
2528 {
2529  struct Operation *op = cls;
2530 
2531  switch (op->phase)
2532  {
2534  /* We got all requests, but still have to send our elements in response. */
2537  "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2538  /* The active peer is done sending offers
2539  * and inquiries. This means that all
2540  * our responses to that (demands and offers)
2541  * must be in flight (queued or in mesh).
2542  *
2543  * We should notify the active peer once
2544  * all our demands are satisfied, so that the active
2545  * peer can quit if we gave it everything.
2547  maybe_finish (op);
2548  return;
2551  "got DONE (as active partner), waiting to finish\n");
2552  /* All demands of the other peer are satisfied,
2553  * and we processed all offers, thus we know
2554  * exactly what our demands must be.
2555  *
2556  * We'll close the channel
2557  * to the other peer once our demands are met.
2558  */op->phase = PHASE_FINISH_CLOSING;
2560  maybe_finish (op);
2561  return;
2562  default:
2563  GNUNET_break_op (0);
2564  fail_union_operation (op);
2565  return;
2566  }
2567 }
2568 
2569 
2576 static void
2578  const struct GNUNET_MessageHeader *mh)
2579 {
2580  send_client_done (cls);
2581 }
2582 
2583 
2592 static struct Operation *
2593 get_incoming (uint32_t id)
2594 {
2595  for (struct Listener *listener = listener_head;
2596  NULL != listener;
2597  listener = listener->next)
2598  {
2599  for (struct Operation *op = listener->op_head;
2600  NULL != op;
2601  op = op->next)
2602  if (op->suggest_id == id)
2603  return op;
2604  }
2605  return NULL;
2606 }
2607 
2608 
2617 static void *
2619  struct GNUNET_SERVICE_Client *c,
2620  struct GNUNET_MQ_Handle *mq)
2621 {
2622  struct ClientState *cs;
2623 
2624  num_clients++;
2625  cs = GNUNET_new (struct ClientState);
2626  cs->client = c;
2627  cs->mq = mq;
2628  return cs;
2629 }
2630 
2631 
2640 static int
2642  const struct GNUNET_HashCode *key,
2643  void *value)
2644 {
2645  struct ElementEntry *ee = value;
2646 
2647  GNUNET_free (ee);
2648  return GNUNET_YES;
2649 }
2650 
2651 
2659 static void
2661  struct GNUNET_SERVICE_Client *client,
2662  void *internal_cls)
2663 {
2664  struct ClientState *cs = internal_cls;
2665  struct Operation *op;
2666  struct Listener *listener;
2667  struct Set *set;
2668 
2670  "Client disconnected, cleaning up\n");
2671  if (NULL != (set = cs->set))
2672  {
2673  struct SetContent *content = set->content;
2674 
2676  "Destroying client's set\n");
2677  /* Destroy pending set operations */
2678  while (NULL != set->ops_head)
2679  _GSS_operation_destroy (set->ops_head);
2680 
2681  /* Destroy operation-specific state */
2682  if (NULL != set->se)
2683  {
2684  strata_estimator_destroy (set->se);
2685  set->se = NULL;
2686  }
2687  /* free set content (or at least decrement RC) */
2688  set->content = NULL;
2689  GNUNET_assert (0 != content->refcount);
2690  content->refcount--;
2691  if (0 == content->refcount)
2692  {
2693  GNUNET_assert (NULL != content->elements);
2696  NULL);
2698  content->elements = NULL;
2699  GNUNET_free (content);
2700  }
2701  GNUNET_free (set);
2702  }
2703 
2704  if (NULL != (listener = cs->listener))
2705  {
2707  "Destroying client's listener\n");
2708  GNUNET_CADET_close_port (listener->open_port);
2709  listener->open_port = NULL;
2710  while (NULL != (op = listener->op_head))
2711  {
2713  "Destroying incoming operation `%u' from peer `%s'\n",
2714  (unsigned int) op->client_request_id,
2715  GNUNET_i2s (&op->peer));
2716  incoming_destroy (op);
2717  }
2718  GNUNET_CONTAINER_DLL_remove (listener_head,
2719  listener_tail,
2720  listener);
2721  GNUNET_free (listener);
2722  }
2723  GNUNET_free (cs);
2724  num_clients--;
2725  if ( (GNUNET_YES == in_shutdown) &&
2726  (0 == num_clients) )
2727  {
2728  if (NULL != cadet)
2729  {
2730  GNUNET_CADET_disconnect (cadet);
2731  cadet = NULL;
2732  }
2733  }
2734 }
2735 
2736 
2745 static int
2747  const struct OperationRequestMessage *msg)
2748 {
2749  struct Operation *op = cls;
2750  struct Listener *listener = op->listener;
2751  const struct GNUNET_MessageHeader *nested_context;
2752 
2753  /* double operation request */
2754  if (0 != op->suggest_id)
2755  {
2756  GNUNET_break_op (0);
2757  return GNUNET_SYSERR;
2758  }
2759  /* This should be equivalent to the previous condition, but can't hurt to check twice */
2760  if (NULL == listener)
2761  {
2762  GNUNET_break (0);
2763  return GNUNET_SYSERR;
2764  }
2765  nested_context = GNUNET_MQ_extract_nested_mh (msg);
2766  if ((NULL != nested_context) &&
2767  (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
2768  {
2769  GNUNET_break_op (0);
2770  return GNUNET_SYSERR;
2771  }
2772  return GNUNET_OK;
2773 }
2774 
2775 
2791 static void
2793  const struct OperationRequestMessage *msg)
2794 {
2795  struct Operation *op = cls;
2796  struct Listener *listener = op->listener;
2797  const struct GNUNET_MessageHeader *nested_context;
2798  struct GNUNET_MQ_Envelope *env;
2799  struct GNUNET_SETU_RequestMessage *cmsg;
2800 
2801  nested_context = GNUNET_MQ_extract_nested_mh (msg);
2802  /* Make a copy of the nested_context (application-specific context
2803  information that is opaque to set) so we can pass it to the
2804  listener later on */
2805  if (NULL != nested_context)
2806  op->context_msg = GNUNET_copy_message (nested_context);
2807  op->remote_element_count = ntohl (msg->element_count);
2808  GNUNET_log (
2810  "Received P2P operation request (port %s) for active listener\n",
2811  GNUNET_h2s (&op->listener->app_id));
2812  GNUNET_assert (0 == op->suggest_id);
2813  if (0 == suggest_id)
2814  suggest_id++;
2815  op->suggest_id = suggest_id++;
2816  GNUNET_assert (NULL != op->timeout_task);
2818  op->timeout_task = NULL;
2819  env = GNUNET_MQ_msg_nested_mh (cmsg,
2821  op->context_msg);
2822  GNUNET_log (
2824  "Suggesting incoming request with accept id %u to listener %p of client %p\n",
2825  op->suggest_id,
2826  listener,
2827  listener->cs);
2828  cmsg->accept_id = htonl (op->suggest_id);
2829  cmsg->peer_id = op->peer;
2830  GNUNET_MQ_send (listener->cs->mq,
2831  env);
2832  /* NOTE: GNUNET_CADET_receive_done() will be called in
2833  #handle_client_accept() */
2834 }
2835 
2836 
2845 static void
2847  const struct GNUNET_SETU_CreateMessage *msg)
2848 {
2849  struct ClientState *cs = cls;
2850  struct Set *set;
2851 
2853  "Client created new set for union operation\n");
2854  if (NULL != cs->set)
2855  {
2856  /* There can only be one set per client */
2857  GNUNET_break (0);
2859  return;
2860  }
2861  set = GNUNET_new (struct Set);
2862  {
2863  struct StrataEstimator *se;
2864 
2866  SE_IBF_SIZE,
2867  SE_IBF_HASH_NUM);
2868  if (NULL == se)
2869  {
2871  "Failed to allocate strata estimator\n");
2872  GNUNET_free (set);
2874  return;
2875  }
2876  set->se = se;
2877  }
2878  set->content = GNUNET_new (struct SetContent);
2879  set->content->refcount = 1;
2880  set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
2881  GNUNET_YES);
2882  set->cs = cs;
2883  cs->set = set;
2885 }
2886 
2887 
2897 static void
2899 {
2900  struct Operation *op = cls;
2901 
2902  op->timeout_task = NULL;
2904  "Remote peer's incoming request timed out\n");
2905  incoming_destroy (op);
2906 }
2907 
2908 
2925 static void *
2926 channel_new_cb (void *cls,
2927  struct GNUNET_CADET_Channel *channel,
2928  const struct GNUNET_PeerIdentity *source)
2929 {
2930  struct Listener *listener = cls;
2931  struct Operation *op;
2932 
2934  "New incoming channel\n");
2935  op = GNUNET_new (struct Operation);
2936  op->listener = listener;
2937  op->peer = *source;
2938  op->channel = channel;
2939  op->mq = GNUNET_CADET_get_mq (op->channel);
2941  UINT32_MAX);
2944  op);
2946  listener->op_tail,
2947  op);
2948  return op;
2949 }
2950 
2951 
2968 static void
2969 channel_end_cb (void *channel_ctx,
2970  const struct GNUNET_CADET_Channel *channel)
2971 {
2972  struct Operation *op = channel_ctx;
2973 
2974  op->channel = NULL;
2976 }
2977 
2978 
2993 static void
2995  const struct GNUNET_CADET_Channel *channel,
2996  int window_size)
2997 {
2998  /* FIXME: not implemented, we could do flow control here... */
2999 }
3000 
3001 
3008 static void
3010  const struct GNUNET_SETU_ListenMessage *msg)
3011 {
3012  struct ClientState *cs = cls;
3013  struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
3014  GNUNET_MQ_hd_var_size (incoming_msg,
3016  struct OperationRequestMessage,
3017  NULL),
3018  GNUNET_MQ_hd_var_size (union_p2p_ibf,
3020  struct IBFMessage,
3021  NULL),
3022  GNUNET_MQ_hd_var_size (union_p2p_elements,
3025  NULL),
3026  GNUNET_MQ_hd_var_size (union_p2p_offer,
3028  struct GNUNET_MessageHeader,
3029  NULL),
3030  GNUNET_MQ_hd_var_size (union_p2p_inquiry,
3032  struct InquiryMessage,
3033  NULL),
3034  GNUNET_MQ_hd_var_size (union_p2p_demand,
3036  struct GNUNET_MessageHeader,
3037  NULL),
3038  GNUNET_MQ_hd_fixed_size (union_p2p_done,
3040  struct GNUNET_MessageHeader,
3041  NULL),
3042  GNUNET_MQ_hd_fixed_size (union_p2p_over,
3044  struct GNUNET_MessageHeader,
3045  NULL),
3046  GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
3048  struct GNUNET_MessageHeader,
3049  NULL),
3050  GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
3052  struct GNUNET_MessageHeader,
3053  NULL),
3054  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3056  struct StrataEstimatorMessage,
3057  NULL),
3058  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3060  struct StrataEstimatorMessage,
3061  NULL),
3062  GNUNET_MQ_hd_var_size (union_p2p_full_element,
3065  NULL),
3067  };
3068  struct Listener *listener;
3069 
3070  if (NULL != cs->listener)
3071  {
3072  /* max. one active listener per client! */
3073  GNUNET_break (0);
3075  return;
3076  }
3077  listener = GNUNET_new (struct Listener);
3078  listener->cs = cs;
3079  cs->listener = listener;
3080  listener->app_id = msg->app_id;
3081  GNUNET_CONTAINER_DLL_insert (listener_head,
3082  listener_tail,
3083  listener);
3085  "New listener created (port %s)\n",
3086  GNUNET_h2s (&listener->app_id));
3087  listener->open_port = GNUNET_CADET_open_port (cadet,
3088  &msg->app_id,
3089  &channel_new_cb,
3090  listener,
3092  &channel_end_cb,
3093  cadet_handlers);
3095 }
3096 
3097 
3105 static void
3107  const struct GNUNET_SETU_RejectMessage *msg)
3108 {
3109  struct ClientState *cs = cls;
3110  struct Operation *op;
3111 
3112  op = get_incoming (ntohl (msg->accept_reject_id));
3113  if (NULL == op)
3114  {
3115  /* no matching incoming operation for this reject;
3116  could be that the other peer already disconnected... */
3118  "Client rejected unknown operation %u\n",
3119  (unsigned int) ntohl (msg->accept_reject_id));
3121  return;
3122  }
3124  "Peer request (app %s) rejected by client\n",
3125  GNUNET_h2s (&cs->listener->app_id));
3128 }
3129 
3130 
3137 static int
3139  const struct GNUNET_SETU_ElementMessage *msg)
3140 {
3141  /* NOTE: Technically, we should probably check with the
3142  block library whether the element we are given is well-formed */
3143  return GNUNET_OK;
3144 }
3145 
3146 
3153 static void
3155  const struct GNUNET_SETU_ElementMessage *msg)
3156 {
3157  struct ClientState *cs = cls;
3158  struct Set *set;
3159  struct GNUNET_SETU_Element el;
3160  struct ElementEntry *ee;
3161  struct GNUNET_HashCode hash;
3162 
3163  if (NULL == (set = cs->set))
3164  {
3165  /* client without a set requested an operation */
3166  GNUNET_break (0);
3168  return;
3169  }
3171  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
3172  el.size = ntohs (msg->header.size) - sizeof(*msg);
3173  el.data = &msg[1];
3174  el.element_type = ntohs (msg->element_type);
3176  &hash);
3177  ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
3178  &hash);
3179  if (NULL == ee)
3180  {
3182  "Client inserts element %s of size %u\n",
3183  GNUNET_h2s (&hash),
3184  el.size);
3185  ee = GNUNET_malloc (el.size + sizeof(*ee));
3186  ee->element.size = el.size;
3187  GNUNET_memcpy (&ee[1], el.data, el.size);
3188  ee->element.data = &ee[1];
3190  ee->remote = GNUNET_NO;
3191  ee->generation = set->current_generation;
3192  ee->element_hash = hash;
3195  set->content->elements,
3196  &ee->element_hash,
3197  ee,
3199  }
3200  else
3201  {
3203  "Client inserted element %s of size %u twice (ignored)\n",
3204  GNUNET_h2s (&hash),
3205  el.size);
3206  /* same element inserted twice */
3207  return;
3208  }
3209  strata_estimator_insert (set->se,
3210  get_ibf_key (&ee->element_hash));
3211 }
3212 
3213 
3220 static void
3222 {
3223  set->content->latest_generation++;
3224  set->current_generation++;
3225 }
3226 
3227 
3237 static int
3239  const struct GNUNET_SETU_EvaluateMessage *msg)
3240 {
3241  /* FIXME: suboptimal, even if the context below could be NULL,
3242  there are malformed messages this does not check for... */
3243  return GNUNET_OK;
3244 }
3245 
3246 
3255 static void
3257  const struct GNUNET_SETU_EvaluateMessage *msg)
3258 {
3259  struct ClientState *cs = cls;
3260  struct Operation *op = GNUNET_new (struct Operation);
3261  const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
3262  GNUNET_MQ_hd_var_size (incoming_msg,
3264  struct OperationRequestMessage,
3265  op),
3266  GNUNET_MQ_hd_var_size (union_p2p_ibf,
3268  struct IBFMessage,
3269  op),
3270  GNUNET_MQ_hd_var_size (union_p2p_elements,
3273  op),
3274  GNUNET_MQ_hd_var_size (union_p2p_offer,
3276  struct GNUNET_MessageHeader,
3277  op),
3278  GNUNET_MQ_hd_var_size (union_p2p_inquiry,
3280  struct InquiryMessage,
3281  op),
3282  GNUNET_MQ_hd_var_size (union_p2p_demand,
3284  struct GNUNET_MessageHeader,
3285  op),
3286  GNUNET_MQ_hd_fixed_size (union_p2p_done,
3288  struct GNUNET_MessageHeader,
3289  op),
3290  GNUNET_MQ_hd_fixed_size (union_p2p_over,
3292  struct GNUNET_MessageHeader,
3293  op),
3294  GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
3296  struct GNUNET_MessageHeader,
3297  op),
3298  GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
3300  struct GNUNET_MessageHeader,
3301  op),
3302  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3304  struct StrataEstimatorMessage,
3305  op),
3306  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3308  struct StrataEstimatorMessage,
3309  op),
3310  GNUNET_MQ_hd_var_size (union_p2p_full_element,
3313  op),
3315  };
3316  struct Set *set;
3317  const struct GNUNET_MessageHeader *context;
3318 
3319  if (NULL == (set = cs->set))
3320  {
3321  GNUNET_break (0);
3322  GNUNET_free (op);
3324  return;
3325  }
3327  UINT32_MAX);
3328  op->peer = msg->target_peer;
3329  op->client_request_id = ntohl (msg->request_id);
3330  op->byzantine = msg->byzantine;
3331  op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
3332  op->force_full = msg->force_full;
3333  op->force_delta = msg->force_delta;
3334  op->symmetric = msg->symmetric;
3335  context = GNUNET_MQ_extract_nested_mh (msg);
3336 
3337  /* Advance generation values, so that
3338  mutations won't interfer with the running operation. */
3339  op->set = set;
3340  op->generation_created = set->current_generation;
3341  advance_generation (set);
3342  GNUNET_CONTAINER_DLL_insert (set->ops_head,
3343  set->ops_tail,
3344  op);
3346  "Creating new CADET channel to port %s for set union\n",
3347  GNUNET_h2s (&msg->app_id));
3348  op->channel = GNUNET_CADET_channel_create (cadet,
3349  op,
3350  &msg->target_peer,
3351  &msg->app_id,
3353  &channel_end_cb,
3354  cadet_handlers);
3355  op->mq = GNUNET_CADET_get_mq (op->channel);
3356  {
3357  struct GNUNET_MQ_Envelope *ev;
3358  struct OperationRequestMessage *msg;
3359 
3360  ev = GNUNET_MQ_msg_nested_mh (msg,
3362  context);
3363  if (NULL == ev)
3364  {
3365  /* the context message is too large */
3366  GNUNET_break (0);
3368  return;
3369  }
3371  GNUNET_NO);
3372  /* copy the current generation's strata estimator for this operation */
3373  op->se = strata_estimator_dup (op->set->se);
3374  /* we started the operation, thus we have to send the operation request */
3375  op->phase = PHASE_EXPECT_SE;
3376  op->salt_receive = op->salt_send = 42; // FIXME?????
3378  "Initiating union operation evaluation\n");
3379  GNUNET_STATISTICS_update (_GSS_statistics,
3380  "# of total union operations",
3381  1,
3382  GNUNET_NO);
3383  GNUNET_STATISTICS_update (_GSS_statistics,
3384  "# of initiated union operations",
3385  1,
3386  GNUNET_NO);
3387  GNUNET_MQ_send (op->mq,
3388  ev);
3389  if (NULL != context)
3391  "sent op request with context message\n");
3392  else
3394  "sent op request without context message\n");
3397  op->key_to_element);
3398 
3399  }
3401 }
3402 
3403 
3410 static void
3412  const struct GNUNET_SETU_CancelMessage *msg)
3413 {
3414  struct ClientState *cs = cls;
3415  struct Set *set;
3416  struct Operation *op;
3417  int found;
3418 
3419  if (NULL == (set = cs->set))
3420  {
3421  /* client without a set requested an operation */
3422  GNUNET_break (0);
3424  return;
3425  }
3426  found = GNUNET_NO;
3427  for (op = set->ops_head; NULL != op; op = op->next)
3428  {
3429  if (op->client_request_id == ntohl (msg->request_id))
3430  {
3431  found = GNUNET_YES;
3432  break;
3433  }
3434  }
3435  if (GNUNET_NO == found)
3436  {
3437  /* It may happen that the operation was already destroyed due to
3438  * the other peer disconnecting. The client may not know about this
3439  * yet and try to cancel the (just barely non-existent) operation.
3440  * So this is not a hard error.
3441  *///
3443  "Client canceled non-existent op %u\n",
3444  (uint32_t) ntohl (msg->request_id));
3445  }
3446  else
3447  {
3449  "Client requested cancel for op %u\n",
3450  (uint32_t) ntohl (msg->request_id));
3452  }
3454 }
3455 
3456 
3465 static void
3467  const struct GNUNET_SETU_AcceptMessage *msg)
3468 {
3469  struct ClientState *cs = cls;
3470  struct Set *set;
3471  struct Operation *op;
3472  struct GNUNET_SETU_ResultMessage *result_message;
3473  struct GNUNET_MQ_Envelope *ev;
3474  struct Listener *listener;
3475 
3476  if (NULL == (set = cs->set))
3477  {
3478  /* client without a set requested to accept */
3479  GNUNET_break (0);
3481  return;
3482  }
3483  op = get_incoming (ntohl (msg->accept_reject_id));
3484  if (NULL == op)
3485  {
3486  /* It is not an error if the set op does not exist -- it may
3487  * have been destroyed when the partner peer disconnected. */
3488  GNUNET_log (
3490  "Client %p accepted request %u of listener %p that is no longer active\n",
3491  cs,
3492  ntohl (msg->accept_reject_id),
3493  cs->listener);
3494  ev = GNUNET_MQ_msg (result_message,
3496  result_message->request_id = msg->request_id;
3497  result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
3498  GNUNET_MQ_send (set->cs->mq, ev);
3500  return;
3501  }
3503  "Client accepting request %u\n",
3504  (uint32_t) ntohl (msg->accept_reject_id));
3505  listener = op->listener;
3506  op->listener = NULL;
3508  listener->op_tail,
3509  op);
3510  op->set = set;
3511  GNUNET_CONTAINER_DLL_insert (set->ops_head,
3512  set->ops_tail,
3513  op);
3514  op->client_request_id = ntohl (msg->request_id);
3515  op->byzantine = msg->byzantine;
3516  op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
3517  op->force_full = msg->force_full;
3518  op->force_delta = msg->force_delta;
3519  op->symmetric = msg->symmetric;
3520 
3521  /* Advance generation values, so that future mutations do not
3522  interfer with the running operation. */
3523  op->generation_created = set->current_generation;
3524  advance_generation (set);
3525  GNUNET_assert (NULL == op->se);
3526 
3528  "accepting set union operation\n");
3529  GNUNET_STATISTICS_update (_GSS_statistics,
3530  "# of accepted union operations",
3531  1,
3532  GNUNET_NO);
3533  GNUNET_STATISTICS_update (_GSS_statistics,
3534  "# of total union operations",
3535  1,
3536  GNUNET_NO);
3537  {
3538  const struct StrataEstimator *se;
3539  struct GNUNET_MQ_Envelope *ev;
3540  struct StrataEstimatorMessage *strata_msg;
3541  char *buf;
3542  size_t len;
3543  uint16_t type;
3544 
3545  op->se = strata_estimator_dup (op->set->se);
3547  GNUNET_NO);
3548  op->salt_receive = op->salt_send = 42; // FIXME?????
3551  op->key_to_element);
3552 
3553  /* kick off the operation */
3554  se = op->se;
3555  buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
3556  len = strata_estimator_write (se,
3557  buf);
3558  if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
3560  else
3562  ev = GNUNET_MQ_msg_extra (strata_msg,
3563  len,
3564  type);
3565  GNUNET_memcpy (&strata_msg[1],
3566  buf,
3567  len);
3568  GNUNET_free (buf);
3569  strata_msg->set_size
3571  op->set->content->elements));
3572  GNUNET_MQ_send (op->mq,
3573  ev);
3574  op->phase = PHASE_EXPECT_IBF;
3575  }
3576  /* Now allow CADET to continue, as we did not do this in
3577  #handle_incoming_msg (as we wanted to first see if the
3578  local client would accept the request). */
3581 }
3582 
3583 
3589 static void
3590 shutdown_task (void *cls)
3591 {
3592  /* Delay actual shutdown to allow service to disconnect clients */
3594  if (0 == num_clients)
3595  {
3596  if (NULL != cadet)
3597  {
3598  GNUNET_CADET_disconnect (cadet);
3599  cadet = NULL;
3600  }
3601  }
3602  GNUNET_STATISTICS_destroy (_GSS_statistics,
3603  GNUNET_YES);
3605  "handled shutdown request\n");
3606 }
3607 
3608 
3617 static void
3618 run (void *cls,
3619  const struct GNUNET_CONFIGURATION_Handle *cfg,
3621 {
3622  /* FIXME: need to modify SERVICE (!) API to allow
3623  us to run a shutdown task *after* clients were
3624  forcefully disconnected! */
3626  NULL);
3627  _GSS_statistics = GNUNET_STATISTICS_create ("setu",
3628  cfg);
3629  cadet = GNUNET_CADET_connect (cfg);
3630  if (NULL == cadet)
3631  {
3633  _ ("Could not connect to CADET service\n"));
3635  return;
3636  }
3637 }
3638 
3639 
3644  "set",
3646  &run,
3649  NULL,
3650  GNUNET_MQ_hd_fixed_size (client_accept,
3653  NULL),
3654  GNUNET_MQ_hd_var_size (client_set_add,
3657  NULL),
3658  GNUNET_MQ_hd_fixed_size (client_create_set,
3661  NULL),
3662  GNUNET_MQ_hd_var_size (client_evaluate,
3665  NULL),
3666  GNUNET_MQ_hd_fixed_size (client_listen,
3669  NULL),
3670  GNUNET_MQ_hd_fixed_size (client_reject,
3673  NULL),
3674  GNUNET_MQ_hd_fixed_size (client_cancel,
3677  NULL),
3679 
3680 
3681 /* end of gnunet-service-setu.c */
struct Operation * prev
Kept in a DLL of the listener, if listener is non-NULL.
Context for op_get_element_iterator.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
uint32_t offset
Offset of the strata in the rest of the message.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST
Request a set union operation from a remote peer.
struct Set * prev
Sets are held in a doubly linked list.
uint32_t received_fresh
Number of elements we received from the other peer that were not in the local set yet...
static void client_disconnect_cb(void *cls, struct GNUNET_SERVICE_Client *client, void *internal_cls)
Clean up after a client has disconnected.
Message sent by the client to the service to start listening for incoming requests to perform a certa...
Definition: setu.h:55
void GNUNET_CADET_disconnect(struct GNUNET_CADET_Handle *handle)
Disconnect from the cadet service.
Definition: cadet_api.c:775
static int check_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check a full element message from a remote peer.
struct Set * set
Set, if associated with the client, otherwise NULL.
#define GNUNET_MESSAGE_TYPE_SETU_REQUEST
Notify the client of an incoming request from a remote peer.
static struct GNUNET_SERVICE_Handle * service
Handle to our service instance.
struct StrataEstimator * se
Copy of the set&#39;s strata estimator at the time of creation of this operation.
After sending the full set, wait for responses with the elements that the local peer is missing...
static int check_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Check a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
State we keep per client.
static unsigned int phase
Processing stage that we are in.
Definition: gnunet-arm.c:114
static int prepare_ibf_iterator(void *cls, uint32_t key, void *value)
Insert a key into an ibf.
static void maybe_finish(struct Operation *op)
Tests if the operation is finished, and if so notify.
struct GNUNET_MessageHeader * context_msg
Context message, may be NULL.
static const struct GNUNET_CONFIGURATION_Handle * cfg
Configuration we are using.
Definition: gnunet-abd.c:36
static void send_full_set(struct Operation *op)
Switch to full set transmission for op.
unsigned int GNUNET_CONTAINER_multihashmap_size(const struct GNUNET_CONTAINER_MultiHashMap *map)
Get the number of key-value pairs in the map.
#define SE_IBF_HASH_NUM
The hash num parameter for the difference digests and strata estimators.
enum IntersectionOperationPhase phase
Current state of the operation.
static unsigned int element_size
int received
Did we receive this element? Even if element->is_foreign is false, we might have received the element...
uint8_t reserved1
Padding, must be 0.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
static int byzantine
uint32_t suggest_id
Unique request id for the request from a remote peer, sent to the client, which will accept or reject...
struct Operation * op
Operation for which the elements should be sent.
uint32_t element_count
For Intersection: my element count.
GNUNET_SERVICE_MAIN("set", GNUNET_SERVICE_OPTION_NONE, &run, &client_connect_cb, &client_disconnect_cb, NULL, GNUNET_MQ_hd_fixed_size(client_accept, GNUNET_MESSAGE_TYPE_SETU_ACCEPT, struct GNUNET_SETU_AcceptMessage, NULL), GNUNET_MQ_hd_var_size(client_set_add, GNUNET_MESSAGE_TYPE_SETU_ADD, struct GNUNET_SETU_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size(client_create_set, GNUNET_MESSAGE_TYPE_SETU_CREATE, struct GNUNET_SETU_CreateMessage, NULL), GNUNET_MQ_hd_var_size(client_evaluate, GNUNET_MESSAGE_TYPE_SETU_EVALUATE, struct GNUNET_SETU_EvaluateMessage, NULL), GNUNET_MQ_hd_fixed_size(client_listen, GNUNET_MESSAGE_TYPE_SETU_LISTEN, struct GNUNET_SETU_ListenMessage, NULL), GNUNET_MQ_hd_fixed_size(client_reject, GNUNET_MESSAGE_TYPE_SETU_REJECT, struct GNUNET_SETU_RejectMessage, NULL), GNUNET_MQ_hd_fixed_size(client_cancel, GNUNET_MESSAGE_TYPE_SETU_CANCEL, struct GNUNET_SETU_CancelMessage, NULL), GNUNET_MQ_handler_end())
Define "main" method using service macro.
Opaque handle to the service.
Definition: cadet_api.c:38
Handle to a service.
Definition: service.c:116
#define GNUNET_MQ_msg_nested_mh(mvar, type, mh)
Allocate a GNUNET_MQ_Envelope, and append a payload message after the given message struct...
int byzantine_lower_bound
Lower bound for the set size, used only when byzantine mode is enabled.
struct GNUNET_HashCode app_id
Application id.
Definition: setu.h:197
Element should be added to the result set of the remote peer, i.e.
struct GNUNET_MQ_Handle * mq
MQ to talk to client.
const void * data
Actual data of the element.
uint8_t symmetric
Also return set elements we are sending to the remote peer.
Definition: setu.h:220
uint16_t reserved
For alignment, always zero.
Definition: setu.h:290
uint32_t accept_reject_id
ID of the incoming request we want to reject.
Definition: setu.h:141
static void send_client_element(struct Operation *op, const struct GNUNET_SETU_Element *element, enum GNUNET_SETU_Status status)
Send a result message to the client indicating that there is a new element.
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run on shutdown, that is when a CTRL-C signal is received, or when GNUNET_SCHEDULER_shutdown() is being invoked.
Definition: scheduler.c:1331
struct GNUNET_HashCode element_hash
Hash of the element.
unsigned int refcount
Number of references to the content.
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
int GNUNET_CONTAINER_multihashmap32_iterate(struct GNUNET_CONTAINER_MultiHashMap32 *map, GNUNET_CONTAINER_MulitHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map.
struct StrataEstimator * strata_estimator_create(unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum)
Create a new strata estimator with the given parameters.
Message containing buckets of an invertible bloom filter.
struct Listener * next
Listeners are held in a doubly linked list.
static void handle_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
#define GNUNET_MQ_extract_nested_mh(var)
Return a pointer to the message at the end of the given message.
struct InvertibleBloomFilter * remote_ibf
The IBF we currently receive.
uint32_t GNUNET_CRYPTO_random_u32(enum GNUNET_CRYPTO_Quality mode, uint32_t i)
Produce a random value.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
static struct GNUNET_CADET_Handle * mh
Cadet handle.
Definition: gnunet-cadet.c:92
static struct Listener * listener_tail
Listeners are held in a doubly linked list.
uint32_t request_id
ID of the request we want to cancel.
Definition: setu.h:309
Invertible bloom filter (IBF).
Definition: ibf.h:82
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE(C)
struct Operation * op_head
Head of DLL of operations this listener is responsible for.
UnionOperationPhase
Current phase we are in for a union operation.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SE
Strata estimator.
static void handle_union_p2p_request_full(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a request for full set transmission.
static int init_key_to_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for initializing the key-to-element mapping of a union operation.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
static int ret
Return value of the commandline.
Definition: gnunet-abd.c:81
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:67
uint32_t accept_reject_id
ID of the incoming request we want to accept.
Definition: setu.h:88
static int check_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Check an IBF message from a remote peer.
static void send_offers_for_key(struct Operation *op, struct IBF_Key ibf_key)
Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
static void unsalt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
FIXME.
struct GNUNET_CONTAINER_MultiHashMap * elements
Maps struct GNUNET_HashCode * to struct ElementEntry *.
static void handle_client_cancel(void *cls, const struct GNUNET_SETU_CancelMessage *msg)
Handle a request from the client to cancel a running set operation.
struct KeyEntry * k
FIXME.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER
Tell the other peer which hashes match a given IBF key.
uint64_t initial_size
Initial size of our set, just before the operation started.
Opaque handle to a channel.
Definition: cadet.h:116
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
static struct GNUNET_CADET_Handle * cadet
Handle to the cadet service, used to listen for and connect to remote peers.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
uint64_t key_val
Definition: ibf.h:47
int GNUNET_CONTAINER_multihashmap32_get_multiple(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, GNUNET_CONTAINER_MulitHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map that match a particular key.
unsigned int ibf_buckets_received
Number of ibf buckets already received into the remote_ibf.
A listener is inhabited by a client, and waits for evaluation requests from remote peers...
#define GNUNET_MESSAGE_TYPE_SETU_P2P_DONE
Set operation is done.
uint32_t request_id
id the result belongs to
Definition: setu.h:254
void ibf_write_slice(const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf)
Write buckets from an ibf to a buffer.
Definition: ibf.c:290
uint32_t salt_receive
Salt for the IBF we&#39;ve received and that we&#39;re currently decoding.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
int GNUNET_CONTAINER_multihashmap_contains(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Check if the map contains any value under the given key (including values that are NULL)...
static void handle_union_p2p_over(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a over message from a remote peer.
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
uint32_t request_id
Request ID to identify responses.
Definition: setu.h:93
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
uint8_t byzantine
GNUNET_YES to fail operations where Byzantine faults are suspected
Definition: setu.h:111
void GNUNET_SCHEDULER_shutdown(void)
Request the shutdown of a scheduler.
Definition: scheduler.c:531
Handle for the service.
static void _GSS_operation_destroy2(struct Operation *op)
This function probably should not exist and be replaced by inlining more specific logic in the variou...
static unsigned int force_full
static void * channel_new_cb(void *cls, struct GNUNET_CADET_Channel *channel, const struct GNUNET_PeerIdentity *source)
Method called whenever another peer has added us to a channel the other peer initiated.
static int send_ibf(struct Operation *op, uint16_t ibf_order)
Send an ibf of appropriate size.
int GNUNET_CONTAINER_multihashmap32_put(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
uint16_t element_type
Type of the element attachted to the message, if any.
Definition: setu.h:265
Message sent by the client to the service to ask starting a new set to perform operations with...
Definition: setu.h:40
Internal representation of the hash map.
#define SE_IBF_SIZE
Size of the IBFs in the strata estimator.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur...
#define INCOMING_CHANNEL_TIMEOUT
How long do we hold on to an incoming channel if there is no local listener before giving up...
unsigned int generation_created
Generation in which the operation handle was created.
static unsigned int num_clients
Number of active clients.
struct GNUNET_SCHEDULER_Task * timeout_task
Timeout task, if the incoming peer has not been accepted after the timeout, it will be disconnected...
static int send_full_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Send a set element.
static struct GNUNET_DNSSTUB_Context * ctx
Context for DNS resolution.
static pa_context * context
Pulseaudio context.
const void * data
Actual data of the element.
struct GNUNET_CADET_Handle * GNUNET_CADET_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the MQ-based cadet service.
Definition: cadet_api.c:910
The protocol is almost finished, but we still have to flush our message queue and/or expect some elem...
A handle to a strata estimator.
struct StrataEstimator * strata_estimator_dup(struct StrataEstimator *se)
Make a copy of a strata estimator.
static void handle_union_p2p_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a done message from a remote peer.
struct Operation * next
Kept in a DLL of the listener, if listener is non-NULL.
static void _GSS_operation_destroy(struct Operation *op)
Destroy the given operation.
uint8_t force_full
Always send full sets, even if delta operations would be more efficient.
Definition: setu.h:209
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
#define _(String)
GNU gettext support macro.
Definition: platform.h:184
The key entry is used to associate an ibf key with an element.
Handle to a client that is connected to a service.
Definition: service.c:250
static void send_client_done(void *cls)
Signal to the client that the operation has finished and destroy the operation.
struct GNUNET_PeerIdentity peer
The identity of the requesting peer.
static struct IBF_Key get_ibf_key(const struct GNUNET_HashCode *src)
Derive the IBF key from a hash code and a salt.
uint8_t symmetric
GNUNET_YES to also send back set elements we are sending to the remote peer.
Definition: setu.h:117
struct InvertibleBloomFilter * ibf_dup(const struct InvertibleBloomFilter *ibf)
Create a copy of an IBF, the copy has to be destroyed properly.
Definition: ibf.c:379
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEC
Compressed strata estimator.
uint16_t element_type
Type of the element to add or remove.
Definition: setu.h:285
uint32_t size
How many cells does this IBF have?
Definition: ibf.h:87
static void handle_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Handle an IBF message from a remote peer.
static int check_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
Message sent by client to service to initiate a set operation as a client (not as listener)...
Definition: setu.h:176
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct...
Definition: gnunet_mq_lib.h:52
The other peer is decoding the IBF we just sent.
static void fail_union_operation(struct Operation *op)
Inform the client that the union operation has failed, and proceed to destroy the evaluate operation...
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1269
The other peer refused to do the operation with us, or something went wrong.
static struct KeyEntry * op_get_element(struct Operation *op, const struct GNUNET_HashCode *element_hash)
Determine whether the given element is already in the operation&#39;s element set.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
static int check_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
GNUNET_SETU_Status
Status for the result callback.
struct StrataEstimator * se
The strata estimator is only generated once for each set.
struct GNUNET_CADET_Port * GNUNET_CADET_open_port(struct GNUNET_CADET_Handle *h, const struct GNUNET_HashCode *port, GNUNET_CADET_ConnectEventHandler connects, void *connects_cls, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Open a port to receive incomming MQ-based channels.
Definition: cadet_api.c:970
#define GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL
Demand the whole element from the other peer, given only the hash code.
, &#39; bother checking if a value already exists (faster than GNUNET_CONTAINER_MULTIHASHMAPOPTION_...
static int destroy_elements_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator over hash map entries to free element entries.
static char * value
Value of the record to add/remove.
#define MAX_BUCKETS_PER_MESSAGE
Number of buckets that can be transmitted in one message.
struct GNUNET_CONTAINER_MultiHashMap * demanded_hashes
Hashes for elements that we have demanded from the other peer.
Information about an element element in the set.
#define GNUNET_MESSAGE_TYPE_SETU_ACCEPT
Accept an incoming set request.
#define GNUNET_MESSAGE_TYPE_SETU_RESULT
Handle result message from operation.
#define GNUNET_MESSAGE_TYPE_SETU_CREATE
Create a new local set.
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
static void advance_generation(struct Set *set)
Advance the current generation of a set, adding exclusion ranges if necessary.
Success, all elements have been sent (and received).
static void handle_union_p2p_full_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a "full done" message.
void strata_estimator_destroy(struct StrataEstimator *se)
Destroy a strata estimator, free all of its resources.
int remote
GNUNET_YES if the element is a remote element, and does not belong to the operation&#39;s set...
uint16_t element_type
Application-specific element type.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
We are decoding an IBF.
int ibf_decode(struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *ret_id)
Decode and remove an element from the IBF, if possible.
Definition: ibf.c:228
uint8_t order
Order of the whole ibf, where num_buckets = 2^order.
struct IBF_Key ibf_key
IBF key for the entry, derived from the current salt.
Element should be added to the result set of the local peer, i.e.
We sent the strata estimator, and expect an IBF.
A set that supports a specific operation with other peers.
uint32_t accept_id
ID of the to identify the request when accepting or rejecting it.
Definition: setu.h:159
#define IBF_BUCKET_SIZE
Size of one ibf bucket in bytes.
Definition: ibf.h:72
static void handle_client_accept(void *cls, const struct GNUNET_SETU_AcceptMessage *msg)
Handle a request from the client to accept a set operation that came from a remote peer...
struct ElementEntry * element
The actual element associated with the key.
Randomness for IVs etc.
unsigned int GNUNET_CONTAINER_multihashmap32_size(const struct GNUNET_CONTAINER_MultiHashMap32 *map)
Get the number of key-value pairs in the map.
uint16_t status
See PRISM_STATUS_*-constants.
#define GNUNET_MESSAGE_TYPE_SETU_CANCEL
Cancel a set operation.
static char buf[2048]
static void salt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
FIXME.
static void channel_window_cb(void *cls, const struct GNUNET_CADET_Channel *channel, int window_size)
Function called whenever an MQ-channel&#39;s transmission window size changes.
int GNUNET_CONTAINER_multihashmap_remove(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, const void *value)
Remove the given key-value pair from the map.
struct Set * set
Set associated with the operation, NULL until the spec has been associated with a set...
In the penultimate phase, we wait until all our demands are satisfied.
uint32_t byzantine_lower_bound
Lower bound for the set size, used only when byzantine mode is enabled.
Definition: setu.h:226
void ibf_subtract(struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2)
Subtract ibf2 from ibf1, storing the result in ibf1.
Definition: ibf.c:356
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF.
unsigned int strata_count
Size of the IBF array in strata.
#define GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE
Maximum size of a context message for set operation requests.
void strata_estimator_insert(struct StrataEstimator *se, struct IBF_Key key)
Add a key to the strata estimator.
size_t strata_estimator_write(const struct StrataEstimator *se, void *buf)
Write the given strata estimator to the buffer.
Internal representation of the hash map.
static struct Operation * get_incoming(uint32_t id)
Get the incoming socket associated with the given id.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE
Request all missing elements from the other peer, based on their sets and the elements we previously ...
A 512-bit hashcode.
uint16_t result_status
Was the evaluation successful? Contains an enum GNUNET_SETU_Status in NBO.
Definition: setu.h:260
Message sent by a listening client to the service to accept performing the operation with the other p...
Definition: setu.h:78
void GNUNET_SERVICE_client_drop(struct GNUNET_SERVICE_Client *c)
Ask the server to disconnect from the given client.
Definition: service.c:2325
uint8_t force_delta
Always use delta operation instead of sending full sets, even it it&#39;s less efficient.
Definition: setu.h:203
static int destroy_key_to_element_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries...
void GNUNET_SETU_element_hash(const struct GNUNET_SETU_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition: setu_api.c:876
Message handler for a specific message type.
static int res
int symmetric
GNUNET_YES to also send back set elements we are sending to the remote peer.
uint8_t byzantine
GNUNET_YES to fail operations where Byzantine faults are suspected
Definition: setu.h:215
#define LOG(kind,...)
struct GNUNET_CONTAINER_MultiHashMap32 * GNUNET_CONTAINER_multihashmap32_create(unsigned int len)
Create a 32-bit key multi hash map.
void GNUNET_CONTAINER_multihashmap32_destroy(struct GNUNET_CONTAINER_MultiHashMap32 *map)
Destroy a 32-bit key hash map.
struct GNUNET_CONTAINER_MultiHashMap32 * key_to_element
Maps unsalted IBF-Keys to elements.
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *mqm)
Discard the message queue message, free all allocated resources.
Definition: mq.c:323
static GstElement * source
Appsrc instance into which we write data for the pipeline.
static void handle_client_create_set(void *cls, const struct GNUNET_SETU_CreateMessage *msg)
Called when a client wants to create a new set.
int force_delta
Always use delta operation instead of sending full sets, even it it&#39;s less efficient.
static int op_get_element_iterator(void *cls, uint32_t key, void *value)
Iterator over the mapping from IBF keys to element entries.
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:36
void ibf_read_slice(const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf)
Read buckets from a buffer into an ibf.
Definition: ibf.c:323
There must only be one value per key; storing a value should fail if a value under the same key alrea...
static void handle_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
struct GNUNET_TESTBED_Peer * peer
The peer associated with this model.
struct GNUNET_HashCode key
The key used in the DHT.
int strata_estimator_read(const void *buf, size_t buf_len, int is_compressed, struct StrataEstimator *se)
Read strata from the buffer into the given strata estimator.
static unsigned int size
Size of the "table".
Definition: peer.c:67
#define GNUNET_MESSAGE_TYPE_SETU_ADD
Add element to set.
struct GNUNET_CADET_Port * open_port
The port we are listening on with CADET.
uint64_t current_size
Current set size.
Definition: setu.h:249
#define GNUNET_MQ_msg_header_extra(mh, esize, type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header and extra space...
Definition: gnunet_mq_lib.h:88
int byzantine
GNUNET_YES to fail operations where Byzantine faults are suspected
void ibf_destroy(struct InvertibleBloomFilter *ibf)
Destroy all resources associated with the invertible bloom filter.
Definition: ibf.c:403
struct ClientState * cs
Client that owns the set.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT
Send a set element, not as response to a demand but because we&#39;re sending the full set...
char * getenv()
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
static int decode_and_send(struct Operation *op)
Decode which elements are missing on each side, and send the appropriate offers and inquiries...
Strata estimator together with the peer&#39;s overall set size.
struct InvertibleBloomFilter * ibf_create(uint32_t size, uint8_t hash_num)
Create an invertible bloom filter.
Definition: ibf.c:79
#define IBF_ALPHA
Number of buckets used in the ibf per estimated difference.
static struct GNUNET_SCHEDULER_Task * timeout_task
Task to be run on timeout.
Definition: gnunet-arm.c:124
Message sent by client to the service to add an element to the set.
Definition: setu.h:275
unsigned int strata_estimator_difference(const struct StrataEstimator *se1, const struct StrataEstimator *se2)
Estimate set difference with two strata estimators, i.e.
static int check_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
int client_done_sent
Did we send the client that we are done?
struct GNUNET_PeerIdentity target_peer
Peer to evaluate the operation with.
Definition: setu.h:192
Operation context used to execute a set operation.
int GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
static int send_offers_iterator(void *cls, uint32_t key, void *value)
Iterator to send elements to a remote peer.
A request for an operation with another client.
Definition: setu.h:148
SetContent stores the actual set elements, which may be shared by multiple generations derived from o...
static void handle_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
static void op_register_element(struct Operation *op, struct ElementEntry *ee, int received)
Insert an element into the union operation&#39;s key-to-element mapping.
static void handle_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Handle offers (of struct GNUNET_HashCodes) and respond with demands (of struct GNUNET_HashCodes).
static void incoming_timeout_cb(void *cls)
Timeout happens iff:
static void incoming_destroy(struct Operation *op)
Destroy an incoming request from a remote peer.
static uint32_t suggest_id
Counter for allocating unique IDs for clients, used to identify incoming operation requests from remo...
uint16_t reserved2
Padding, must be 0.
uint32_t request_id
Id of our set to evaluate, chosen implicitly by the client when it calls GNUNET_SETU_commit().
Definition: setu.h:187
static int check_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
Allow multiple values with the same key.
unsigned int generation
First generation that includes this element.
Handle to a message queue.
Definition: mq.c:85
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF.
static struct GNUNET_CRYPTO_PowSalt salt
Salt for PoW calcualations.
void ibf_insert(struct InvertibleBloomFilter *ibf, struct IBF_Key key)
Insert a key into an IBF.
Definition: ibf.c:167
#define GNUNET_MQ_msg_header(type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
Definition: gnunet_mq_lib.h:76
The identity of the host (wraps the signing key of the peer).
static struct GNUNET_STATISTICS_Handle * _GSS_statistics
Statistics handle.
static void initialize_key_to_element(struct Operation *op)
Initialize the IBF key to element mapping local to this set operation.
struct GNUNET_PeerIdentity peer_id
Identity of the requesting peer.
Definition: setu.h:164
void GNUNET_CADET_receive_done(struct GNUNET_CADET_Channel *channel)
Send an ack on the channel to confirm the processing of a message.
Definition: cadet_api.c:888
static int send_missing_full_elements_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries...
uint8_t force_full
Always send full sets, even if delta operations would be more efficient.
Definition: setu.h:105
struct GNUNET_CADET_Channel * GNUNET_CADET_channel_create(struct GNUNET_CADET_Handle *h, void *channel_cls, const struct GNUNET_PeerIdentity *destination, const struct GNUNET_HashCode *port, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Create a new channel towards a remote peer.
Definition: cadet_api.c:1031
struct GNUNET_CADET_Channel * channel
Channel to the peer.
uint32_t salt
Salt used when hashing elements for this IBF.
struct Operation * ops_head
Evaluate operations are held in a linked list.
Message sent by the service to the client to indicate an element that is removed (set intersection) o...
Definition: setu.h:239
configuration data
Definition: configuration.c:84
uint32_t salt
Salt to use for the operation.
struct GNUNET_SERVICE_Client * client
Client this is about.
const char * name
uint32_t client_request_id
ID used to identify an operation between service and client.
int GNUNET_CRYPTO_hash_cmp(const struct GNUNET_HashCode *h1, const struct GNUNET_HashCode *h2)
Compare function for HashCodes, producing a total ordering of all hashcodes.
Definition: crypto_hash.c:311
struct InvertibleBloomFilter * local_ibf
The IBF with the local set&#39;s element.
struct SetContent * content
Content, possibly shared by multiple sets, and thus reference counted.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_IBF
Invertible bloom filter.
Sent to the service by the client in order to cancel a set operation.
Definition: setu.h:299
static int in_shutdown
Are we in shutdown? if GNUNET_YES and the number of clients drops to zero, disconnect from CADET...
uint16_t size
Number of bytes in the buffer pointed to by data.
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
static void handle_client_reject(void *cls, const struct GNUNET_SETU_RejectMessage *msg)
Called when the listening client rejects an operation request by another peer.
#define GNUNET_log(kind,...)
static int check_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Check offer (of struct GNUNET_HashCodes).
Entry in list of pending tasks.
Definition: scheduler.c:134
static void handle_client_listen(void *cls, const struct GNUNET_SETU_ListenMessage *msg)
Called when a client wants to create a new listener.
#define MAX_IBF_ORDER
The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
#define SE_STRATA_COUNT
Number of IBFs in a strata estimator.
static struct Listener * listener_head
Listeners are held in a doubly linked list.
struct GNUNET_SET_Element element
The actual element.
Opaque handle to a port.
Definition: cadet_api.c:79
struct Listener * prev
Listeners are held in a doubly linked list.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
static int check_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Check a request for a set operation from another peer.
#define GNUNET_MESSAGE_TYPE_SETU_LISTEN
Listen for operation requests.
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
struct GNUNET_HashCode app_id
Application ID for the operation, used to distinguish multiple operations of the same type with the s...
Header for all communications.
void GNUNET_CADET_close_port(struct GNUNET_CADET_Port *p)
Close a port opened with GNUNET_CADET_open_port().
Definition: cadet_api.c:808
static void shutdown_task(void *cls)
Called to clean up, after a shutdown has been requested.
static void handle_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
uint8_t force_delta
Always use delta operation instead of sending full sets, even it it&#39;s less efficient.
Definition: setu.h:99
static unsigned int get_order_from_difference(unsigned int diff)
Compute the necessary order of an ibf from the size of the symmetric set difference.
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:355
#define GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SETU_REJECT
Reject a set request.
static int prepare_ibf(struct Operation *op, uint32_t size)
Create an ibf with the operation&#39;s elements of the specified size.
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SETU_ADD.
Definition: setu.h:280
#define GNUNET_MESSAGE_TYPE_SETU_EVALUATE
Evaluate a set operation.
struct ClientState * cs
Client that owns the listener.
struct Listener * listener
Listener, if associated with the client, otherwise NULL.
uint32_t remote_element_count
Remote peers element count.
struct Operation * op_tail
Tail of DLL of operations this listener is responsible for.
static void * client_connect_cb(void *cls, struct GNUNET_SERVICE_Client *c, struct GNUNET_MQ_Handle *mq)
Callback called when a client connects to the service.
static struct GNUNET_ARM_Operation * op
Current operation.
Definition: gnunet-arm.c:144
In the ultimate phase, we wait until our demands are satisfied and then quit (sending another DONE me...
static void channel_end_cb(void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
Function called whenever a channel is destroyed.
Keys that can be inserted into and removed from an IBF.
Definition: ibf.h:45
uint32_t salt
Salt used when hashing elements for this inquiry.
uint32_t byzantine_lower_bound
Lower bound for the set size, used only when byzantine mode is enabled.
Definition: setu.h:123
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MulitHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
static struct GNUNET_IDENTITY_EgoLookup * el
EgoLookup.
Definition: gnunet-abd.c:51
Continuation for multi part IBFs.
struct Set * next
Sets are held in a doubly linked list (in sets_head and sets_tail).
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER
Request all missing elements from the other peer, based on their sets and the elements we previously ...
uint32_t salt_send
Salt that we&#39;re using for sending IBFs.
void GNUNET_CADET_channel_destroy(struct GNUNET_CADET_Channel *channel)
Destroy an existing channel.
Definition: cadet_api.c:837
static void run(void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_SERVICE_Handle *service)
Function called by the service&#39;s run method to run service-specific setup code.
We sent the request message, and expect a strata estimator.
struct Listener * listener
Port this operation runs on.
int GNUNET_CRYPTO_kdf(void *result, size_t out_len, const void *xts, size_t xts_len, const void *skm, size_t skm_len,...)
Derive key.
Definition: crypto_kdf.c:90
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
void GNUNET_SERVICE_client_continue(struct GNUNET_SERVICE_Client *c)
Continue receiving further messages from the given client.
Definition: service.c:2244
uint16_t size
Number of bytes in the buffer pointed to by data.
struct GNUNET_MQ_Handle * mq
Message queue for the channel.
static int check_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check an element message from a remote peer.
static void handle_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
Used as a closure for sending elements with a specific IBF key.
struct GNUNET_HashCode hash
FIXME.
Message sent by a listening client to the service to reject performing the operation with the other p...
Definition: setu.h:131
struct IBF_Key ibf_key
The IBF key whose matching elements should be sent.
static void handle_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
#define GNUNET_malloc(size)
Wrapper around malloc.
static int _GSS_is_element_of_operation(struct ElementEntry *ee, struct Operation *op)
Is element ee part of the set used by op?
#define GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY
Tell the other peer to send us a list of hashes that match an IBF key.
uint32_t received_total
Total number of elements received from the other peer.
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:53
static unsigned int force_delta
#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS
Actual set elements.
#define GNUNET_free(ptr)
Wrapper around free.
uint16_t element_type
Application-specific element type.
static void handle_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Handle a request for a set operation from another peer.
struct GNUNET_MessageHeader * GNUNET_copy_message(const struct GNUNET_MessageHeader *msg)
Create a copy of the given message.
uint16_t len
length of data (which is always a uint32_t, but presumably this can be used to specify that fewer byt...
struct GNUNET_MQ_Handle * GNUNET_CADET_get_mq(const struct GNUNET_CADET_Channel *channel)
Obtain the message queue for a connected peer.
Definition: cadet_api.c:1082
int force_full
Always send full sets, even if delta operations would be more efficient.
struct GNUNET_HashCode app_id
application id
Definition: setu.h:70
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:972
Element stored in a set.
static void handle_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
unsigned int ibf_size
Size of each IBF stratum (in bytes)