GNUnet  0.11.x
gnunet-service-setu.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2013-2017, 2020 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
29 #include "ibf.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_cadet_service.h"
36 #include <gcrypt.h>
37 #include "gnunet_setu_service.h"
38 #include "setu.h"
39 
40 #define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
41 
46 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
47 
51 #define SE_STRATA_COUNT 32
52 
56 #define SE_IBF_SIZE 80
57 
61 #define SE_IBF_HASH_NUM 4
62 
66 #define MAX_BUCKETS_PER_MESSAGE ((1 << 15) / IBF_BUCKET_SIZE)
67 
73 #define MAX_IBF_ORDER (20)
74 
79 #define IBF_ALPHA 4
80 
81 
86 {
91 
102 
107 
112 
117 
123 
129 
135 
141 
147 };
148 
149 
156 struct ElementEntry
157 {
163 
169 
173  unsigned int generation;
174 
179  int remote;
180 };
181 
182 
187 struct Listener;
188 
189 
193 struct Set;
194 
195 
199 struct ClientState
200 {
204  struct Set *set;
205 
209  struct Listener *listener;
210 
214  struct GNUNET_SERVICE_Client *client;
215 
219  struct GNUNET_MQ_Handle *mq;
220 };
221 
222 
226 struct Operation
227 {
228 
233  struct GNUNET_PeerIdentity peer;
234 
238  uint64_t initial_size;
239 
243  struct Operation *next;
244 
248  struct Operation *prev;
249 
253  struct GNUNET_CADET_Channel *channel;
254 
258  struct Listener *listener;
259 
263  struct GNUNET_MQ_Handle *mq;
264 
268  struct GNUNET_MessageHeader *context_msg;
269 
274  struct Set *set;
275 
281 
286 
291 
298 
304 
309 
314 
318  int client_done_sent;
319 
323  unsigned int ibf_buckets_received;
324 
328  uint32_t salt_send;
329 
333  uint32_t salt_receive;
334 
339  uint32_t received_fresh;
340 
344  uint32_t received_total;
345 
349  uint32_t salt;
350 
354  uint32_t remote_element_count;
355 
359  uint32_t client_request_id;
360 
365  int force_delta;
366 
371  int force_full;
372 
377  int byzantine;
378 
384 
389  int byzantine_lower_bound;
390 
396  uint32_t suggest_id;
397 
402  unsigned int generation_created;
403 
408 
413 
418 
419 };
420 
421 
426 struct SetContent
427 {
431  struct GNUNET_CONTAINER_MultiHashMap *elements;
432 
436  unsigned int refcount;
437 
441  unsigned int latest_generation;
442 
446  int iterator_count;
447 };
448 
449 
453 struct Set
454 {
458  struct Set *next;
459 
463  struct Set *prev;
464 
469  struct ClientState *cs;
470 
475  struct SetContent *content;
476 
482 
486  struct Operation *ops_head;
487 
491  struct Operation *ops_tail;
492 
497  unsigned int current_generation;
498 
499 };
500 
501 
505 struct KeyEntry
506 {
510  struct IBF_Key ibf_key;
511 
518  struct ElementEntry *element;
519 
525  int received;
526 };
527 
528 
533 struct SendElementClosure
534 {
539  struct IBF_Key ibf_key;
540 
545  struct Operation *op;
546 };
547 
548 
553 struct Listener
554 {
558  struct Listener *next;
559 
563  struct Listener *prev;
564 
570  struct Operation *op_head;
571 
577  struct Operation *op_tail;
578 
583  struct ClientState *cs;
584 
588  struct GNUNET_CADET_Port *open_port;
589 
594  struct GNUNET_HashCode app_id;
595 
596 };
597 
598 
603 static struct GNUNET_CADET_Handle *cadet;
604 
609 
613 static struct Listener *listener_head;
614 
618 static struct Listener *listener_tail;
619 
623 static unsigned int num_clients;
624 
629 static int in_shutdown;
630 
636 static uint32_t suggest_id;
637 
638 
645  int sent;
647  int received;
649 };
650 
651 
653 {
654  struct perf_num_send_resived_msg operation_request;
656  struct perf_num_send_resived_msg request_full;
657  struct perf_num_send_resived_msg element_full;
658  struct perf_num_send_resived_msg full_done;
666 };
667 
669 
670 
671 static int
673  return (size * perf_rtt_struct.sent) +
674  (size * perf_rtt_struct.received) +
675  perf_rtt_struct.sent_var_bytes +
676  perf_rtt_struct.received_var_bytes;
677 }
678 
679 static float
684  float rtt = 1;
685  int bytes_transmitted = 0;
686 
690  if (( perf_rtt.element_full.received != 0 ) ||
691  ( perf_rtt.element_full.sent != 0)
692  ) rtt += 1;
693 
694  if (( perf_rtt.request_full.received != 0 ) ||
695  ( perf_rtt.request_full.sent != 0)
696  ) rtt += 0.5;
697 
703  int iterations = perf_rtt.ibf.received;
704  if(iterations > 1)
705  rtt += (iterations - 1 ) * 0.5;
706  rtt += 3 * iterations;
707 
722 
723  LOG(GNUNET_ERROR_TYPE_ERROR,"Bytes Transmitted: %d\n", bytes_transmitted);
724 
725  LOG(GNUNET_ERROR_TYPE_ERROR,"Reached tradeoff bandwidth/rtt: %f\n", (bytes_transmitted / rtt ));
726 
727  return rtt;
728 }
729 
730 
741 static int
743  uint32_t key,
744  void *value)
745 {
746  struct KeyEntry *k = value;
747 
748  GNUNET_assert (NULL != k);
749  if (GNUNET_YES == k->element->remote)
750  {
751  GNUNET_free (k->element);
752  k->element = NULL;
753  }
754  GNUNET_free (k);
755  return GNUNET_YES;
756 }
757 
758 
765 static void
766 send_client_done (void *cls)
767 {
768  struct Operation *op = cls;
769  struct GNUNET_MQ_Envelope *ev;
770  struct GNUNET_SETU_ResultMessage *rm;
771 
772  if (GNUNET_YES == op->client_done_sent)
773  return;
774  if (PHASE_FINISHED != op->phase)
775  {
777  "Union operation failed\n");
778  GNUNET_STATISTICS_update (_GSS_statistics,
779  "# Union operations failed",
780  1,
781  GNUNET_NO);
784  rm->request_id = htonl (op->client_request_id);
785  rm->element_type = htons (0);
786  GNUNET_MQ_send (op->set->cs->mq,
787  ev);
788  return;
789  }
790 
792 
793  GNUNET_STATISTICS_update (_GSS_statistics,
794  "# Union operations succeeded",
795  1,
796  GNUNET_NO);
798  "Signalling client that union operation is done\n");
799  ev = GNUNET_MQ_msg (rm,
801  rm->request_id = htonl (op->client_request_id);
803  rm->element_type = htons (0);
805  op->key_to_element));
806  GNUNET_MQ_send (op->set->cs->mq,
807  ev);
808 }
809 
810 
811 /* FIXME: the destroy logic is a mess and should be cleaned up! */
812 
825 static void
827 {
828  struct Set *set = op->set;
829  struct GNUNET_CADET_Channel *channel;
830 
832  "Destroying union operation %p\n",
833  op);
834  GNUNET_assert (NULL == op->listener);
835  /* check if the op was canceled twice */
836  if (NULL != op->remote_ibf)
837  {
838  ibf_destroy (op->remote_ibf);
839  op->remote_ibf = NULL;
840  }
841  if (NULL != op->demanded_hashes)
842  {
844  op->demanded_hashes = NULL;
845  }
846  if (NULL != op->local_ibf)
847  {
848  ibf_destroy (op->local_ibf);
849  op->local_ibf = NULL;
850  }
851  if (NULL != op->se)
852  {
854  op->se = NULL;
855  }
856  if (NULL != op->key_to_element)
857  {
860  NULL);
862  op->key_to_element = NULL;
863  }
864  if (NULL != set)
865  {
866  GNUNET_CONTAINER_DLL_remove (set->ops_head,
867  set->ops_tail,
868  op);
869  op->set = NULL;
870  }
871  if (NULL != op->context_msg)
872  {
873  GNUNET_free (op->context_msg);
874  op->context_msg = NULL;
875  }
876  if (NULL != (channel = op->channel))
877  {
878  /* This will free op; called conditionally as this helper function
879  is also called from within the channel disconnect handler. */
880  op->channel = NULL;
882  }
883  /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
884  * there was a channel end handler that will free 'op' on the call stack. */
885 }
886 
887 
893 static void
895 
896 
902 static void
904 {
905  struct Listener *listener;
906 
908  "Destroying incoming operation %p\n",
909  op);
910  if (NULL != (listener = op->listener))
911  {
913  listener->op_tail,
914  op);
915  op->listener = NULL;
916  }
917  if (NULL != op->timeout_task)
918  {
920  op->timeout_task = NULL;
921  }
923 }
924 
925 
931 static void
933 {
934  struct GNUNET_CADET_Channel *channel;
935 
936  if (NULL != (channel = op->channel))
937  {
938  /* This will free op; called conditionally as this helper function
939  is also called from within the channel disconnect handler. */
940  op->channel = NULL;
942  }
943  if (NULL != op->listener)
944  {
945  incoming_destroy (op);
946  return;
947  }
948  if (NULL != op->set)
949  send_client_done (op);
951  GNUNET_free (op);
952 }
953 
954 
961 static void
963 {
964  struct GNUNET_MQ_Envelope *ev;
966 
968  "union operation failed\n");
971  msg->request_id = htonl (op->client_request_id);
972  msg->element_type = htons (0);
973  GNUNET_MQ_send (op->set->cs->mq,
974  ev);
976 }
977 
978 
986 static struct IBF_Key
987 get_ibf_key (const struct GNUNET_HashCode *src)
988 {
989  struct IBF_Key key;
990  uint16_t salt = 0;
991 
993  GNUNET_CRYPTO_kdf (&key, sizeof(key),
994  src, sizeof *src,
995  &salt, sizeof(salt),
996  NULL, 0));
997  return key;
998 }
999 
1000 
1004 struct GetElementContext
1005 {
1009  struct GNUNET_HashCode hash;
1010 
1014  struct KeyEntry *k;
1015 };
1016 
1017 
1028 static int
1030  uint32_t key,
1031  void *value)
1032 {
1033  struct GetElementContext *ctx = cls;
1034  struct KeyEntry *k = value;
1035 
1036  GNUNET_assert (NULL != k);
1038  &ctx->hash))
1039  {
1040  ctx->k = k;
1041  return GNUNET_NO;
1042  }
1043  return GNUNET_YES;
1044 }
1045 
1046 
1055 static struct KeyEntry *
1057  const struct GNUNET_HashCode *element_hash)
1058 {
1059  int ret;
1060  struct IBF_Key ibf_key;
1061  struct GetElementContext ctx = { { { 0 } }, 0 };
1062 
1063  ctx.hash = *element_hash;
1064 
1065  ibf_key = get_ibf_key (element_hash);
1067  (uint32_t) ibf_key.key_val,
1069  &ctx);
1070 
1071  /* was the iteration aborted because we found the element? */
1072  if (GNUNET_SYSERR == ret)
1073  {
1074  GNUNET_assert (NULL != ctx.k);
1075  return ctx.k;
1076  }
1077  return NULL;
1078 }
1079 
1080 
1095 static void
1097  struct ElementEntry *ee,
1098  int received)
1099 {
1100  struct IBF_Key ibf_key;
1101  struct KeyEntry *k;
1102 
1103  ibf_key = get_ibf_key (&ee->element_hash);
1104  k = GNUNET_new (struct KeyEntry);
1105  k->element = ee;
1106  k->ibf_key = ibf_key;
1107  k->received = received;
1110  (uint32_t) ibf_key.key_val,
1111  k,
1113 }
1114 
1115 
1120 static void
1121 salt_key (const struct IBF_Key *k_in,
1122  uint32_t salt,
1123  struct IBF_Key *k_out)
1124 {
1125  int s = salt % 64;
1126  uint64_t x = k_in->key_val;
1127 
1128  /* rotate ibf key */
1129  x = (x >> s) | (x << (64 - s));
1130  k_out->key_val = x;
1131 }
1132 
1133 
1137 static void
1138 unsalt_key (const struct IBF_Key *k_in,
1139  uint32_t salt,
1140  struct IBF_Key *k_out)
1141 {
1142  int s = salt % 64;
1143  uint64_t x = k_in->key_val;
1144 
1145  x = (x << s) | (x >> (64 - s));
1146  k_out->key_val = x;
1147 }
1148 
1149 
1157 static int
1159  uint32_t key,
1160  void *value)
1161 {
1162  struct Operation *op = cls;
1163  struct KeyEntry *ke = value;
1164  struct IBF_Key salted_key;
1165 
1167  "[OP %p] inserting %lx (hash %s) into ibf\n",
1168  op,
1169  (unsigned long) ke->ibf_key.key_val,
1170  GNUNET_h2s (&ke->element->element_hash));
1171  salt_key (&ke->ibf_key,
1172  op->salt_send,
1173  &salted_key);
1174  ibf_insert (op->local_ibf, salted_key);
1175  return GNUNET_YES;
1176 }
1177 
1178 
1186 static int
1188  struct Operation *op)
1189 {
1190  return ee->generation >= op->generation_created;
1191 }
1192 
1193 
1204 static int
1206  const struct GNUNET_HashCode *key,
1207  void *value)
1208 {
1209  struct Operation *op = cls;
1210  struct ElementEntry *ee = value;
1211 
1212  /* make sure that the element belongs to the set at the time
1213  * of creating the operation */
1214  if (GNUNET_NO ==
1216  op))
1217  return GNUNET_YES;
1218  GNUNET_assert (GNUNET_NO == ee->remote);
1219  op_register_element (op,
1220  ee,
1221  GNUNET_NO);
1222  return GNUNET_YES;
1223 }
1224 
1225 
1231 static void
1233 {
1234  unsigned int len;
1235 
1236  GNUNET_assert (NULL == op->key_to_element);
1241  op);
1242 }
1243 
1244 
1253 static int
1255  uint32_t size)
1256 {
1257  GNUNET_assert (NULL != op->key_to_element);
1258 
1259  if (NULL != op->local_ibf)
1260  ibf_destroy (op->local_ibf);
1261  op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
1262  if (NULL == op->local_ibf)
1263  {
1265  "Failed to allocate local IBF\n");
1266  return GNUNET_SYSERR;
1267  }
1270  op);
1271  return GNUNET_OK;
1272 }
1273 
1274 
1284 static int
1285 send_ibf (struct Operation *op,
1286  uint16_t ibf_order)
1287 {
1288  unsigned int buckets_sent = 0;
1289  struct InvertibleBloomFilter *ibf;
1290 
1291  if (GNUNET_OK !=
1292  prepare_ibf (op, 1 << ibf_order))
1293  {
1294  /* allocation failed */
1295  return GNUNET_SYSERR;
1296  }
1297 
1299  "sending ibf of size %u\n",
1300  1 << ibf_order);
1301 
1302  {
1303  char name[64];
1304  GNUNET_snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_order);
1305  GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
1306  }
1307 
1308  ibf = op->local_ibf;
1309 
1310  while (buckets_sent < (1 << ibf_order))
1311  {
1312  unsigned int buckets_in_message;
1313  struct GNUNET_MQ_Envelope *ev;
1314  struct IBFMessage *msg;
1315 
1316  buckets_in_message = (1 << ibf_order) - buckets_sent;
1317  /* limit to maximum */
1318  if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
1319  buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
1320 
1321  perf_rtt.ibf.sent += 1;
1322  perf_rtt.ibf.sent_var_bytes += ( buckets_in_message * IBF_BUCKET_SIZE );
1323  ev = GNUNET_MQ_msg_extra (msg,
1324  buckets_in_message * IBF_BUCKET_SIZE,
1326  msg->reserved1 = 0;
1327  msg->reserved2 = 0;
1328  msg->order = ibf_order;
1329  msg->offset = htonl (buckets_sent);
1330  msg->salt = htonl (op->salt_send);
1331  ibf_write_slice (ibf, buckets_sent,
1332  buckets_in_message, &msg[1]);
1333  buckets_sent += buckets_in_message;
1335  "ibf chunk size %u, %u/%u sent\n",
1336  buckets_in_message,
1337  buckets_sent,
1338  1 << ibf_order);
1339  GNUNET_MQ_send (op->mq, ev);
1340  }
1341 
1342  /* The other peer must decode the IBF, so
1343  * we're passive. */
1345  return GNUNET_OK;
1346 }
1347 
1348 
1356 static unsigned int
1357 get_order_from_difference (unsigned int diff)
1358 {
1359  unsigned int ibf_order;
1360 
1361  ibf_order = 2;
1362  while (((1 << ibf_order) < (IBF_ALPHA * diff) ||
1363  ((1 << ibf_order) < SE_IBF_HASH_NUM)) &&
1364  (ibf_order < MAX_IBF_ORDER))
1365  ibf_order++;
1366  // add one for correction
1367  return ibf_order + 1;
1368 }
1369 
1370 
1380 static int
1382  const struct GNUNET_HashCode *key,
1383  void *value)
1384 {
1385  struct Operation *op = cls;
1386  struct GNUNET_SETU_ElementMessage *emsg;
1387  struct ElementEntry *ee = value;
1388  struct GNUNET_SETU_Element *el = &ee->element;
1389  struct GNUNET_MQ_Envelope *ev;
1390 
1392  "Sending element %s\n",
1393  GNUNET_h2s (key));
1396  ev = GNUNET_MQ_msg_extra (emsg,
1397  el->size,
1399  emsg->element_type = htons (el->element_type);
1400  GNUNET_memcpy (&emsg[1],
1401  el->data,
1402  el->size);
1403  GNUNET_MQ_send (op->mq,
1404  ev);
1405  return GNUNET_YES;
1406 }
1407 
1408 
1414 static void
1416 {
1417  struct GNUNET_MQ_Envelope *ev;
1418 
1419  op->phase = PHASE_FULL_SENDING;
1421  "Dedicing to transmit the full set\n");
1422  /* FIXME: use a more memory-friendly way of doing this with an
1423  iterator, just as we do in the non-full case! */
1426  op);
1427  perf_rtt.full_done.sent += 1;
1429  GNUNET_MQ_send (op->mq,
1430  ev);
1431 }
1432 
1433 
1440 static int
1442  const struct StrataEstimatorMessage *msg)
1443 {
1444  struct Operation *op = cls;
1445  int is_compressed;
1446  size_t len;
1447 
1448  if (op->phase != PHASE_EXPECT_SE)
1449  {
1450  GNUNET_break (0);
1451  return GNUNET_SYSERR;
1452  }
1453  is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
1454  msg->header.type));
1455  len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1456  if ((GNUNET_NO == is_compressed) &&
1458  {
1459  GNUNET_break (0);
1460  return GNUNET_SYSERR;
1461  }
1462  return GNUNET_OK;
1463 }
1464 
1465 
1472 static void
1474  const struct StrataEstimatorMessage *msg)
1475 {
1476  perf_rtt.se.received += 1;
1477  perf_rtt.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1478  struct Operation *op = cls;
1479  struct StrataEstimator *remote_se;
1480  unsigned int diff;
1481  uint64_t other_size;
1482  size_t len;
1483  int is_compressed;
1484 
1485  is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
1486  msg->header.type));
1487  GNUNET_STATISTICS_update (_GSS_statistics,
1488  "# bytes of SE received",
1489  ntohs (msg->header.size),
1490  GNUNET_NO);
1491  len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
1492  other_size = GNUNET_ntohll (msg->set_size);
1494  SE_IBF_SIZE,
1495  SE_IBF_HASH_NUM);
1496  if (NULL == remote_se)
1497  {
1498  /* insufficient resources, fail */
1499  fail_union_operation (op);
1500  return;
1501  }
1502  if (GNUNET_OK !=
1503  strata_estimator_read (&msg[1],
1504  len,
1505  is_compressed,
1506  remote_se))
1507  {
1508  /* decompression failed */
1509  strata_estimator_destroy (remote_se);
1510  fail_union_operation (op);
1511  return;
1512  }
1513  GNUNET_assert (NULL != op->se);
1514  diff = strata_estimator_difference (remote_se,
1515  op->se);
1516 
1517  if (diff > 200)
1518  diff = diff * 3 / 2;
1519 
1520  strata_estimator_destroy (remote_se);
1522  op->se = NULL;
1524  "got se diff=%d, using ibf size %d\n",
1525  diff,
1526  1U << get_order_from_difference (diff));
1527 
1528  {
1529  char *set_debug;
1530 
1531  set_debug = getenv ("GNUNET_SETU_BENCHMARK");
1532  if ((NULL != set_debug) &&
1533  (0 == strcmp (set_debug, "1")))
1534  {
1535  FILE *f = fopen ("set.log", "a");
1536  fprintf (f, "%llu\n", (unsigned long long) diff);
1537  fclose (f);
1538  }
1539  }
1540 
1541  if ((GNUNET_YES == op->byzantine) &&
1542  (other_size < op->byzantine_lower_bound))
1543  {
1544  GNUNET_break (0);
1545  fail_union_operation (op);
1546  return;
1547  }
1548 
1550  "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx: %f\n", op->rtt_bandwidth_tradeoff);
1551 
1552 
1556  if ((GNUNET_YES == op->force_full) ||
1557  (diff > op->initial_size / 4) ||
1558  (0 == other_size))
1559  {
1561  "Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
1562  diff,
1563  (unsigned long long) op->initial_size);
1564  GNUNET_STATISTICS_update (_GSS_statistics,
1565  "# of full sends",
1566  1,
1567  GNUNET_NO);
1568  if ((op->initial_size <= other_size) ||
1569  (0 == other_size))
1570  {
1571  send_full_set (op);
1572  }
1573  else
1574  {
1575  struct GNUNET_MQ_Envelope *ev;
1576 
1578  "Telling other peer that we expect its full set\n");
1580  perf_rtt.request_full.sent += 1;
1581  ev = GNUNET_MQ_msg_header (
1583  GNUNET_MQ_send (op->mq,
1584  ev);
1585  }
1586  }
1587  else
1588  {
1589  GNUNET_STATISTICS_update (_GSS_statistics,
1590  "# of ibf sends",
1591  1,
1592  GNUNET_NO);
1593  if (GNUNET_OK !=
1594  send_ibf (op,
1595  get_order_from_difference (diff)))
1596  {
1597  /* Internal error, best we can do is shut the connection */
1599  "Failed to send IBF, closing connection\n");
1600  fail_union_operation (op);
1601  return;
1602  }
1603  }
1605 }
1606 
1607 
1615 static int
1617  uint32_t key,
1618  void *value)
1619 {
1620  struct SendElementClosure *sec = cls;
1621  struct Operation *op = sec->op;
1622  struct KeyEntry *ke = value;
1623  struct GNUNET_MQ_Envelope *ev;
1624  struct GNUNET_MessageHeader *mh;
1625 
1626  /* Detect 32-bit key collision for the 64-bit IBF keys. */
1627  if (ke->ibf_key.key_val != sec->ibf_key.key_val)
1628  return GNUNET_YES;
1629 
1630  perf_rtt.offer.sent += 1;
1631  perf_rtt.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode);
1632 
1633  ev = GNUNET_MQ_msg_header_extra (mh,
1634  sizeof(struct GNUNET_HashCode),
1636 
1637  GNUNET_assert (NULL != ev);
1638  *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
1640  "[OP %p] sending element offer (%s) to peer\n",
1641  op,
1642  GNUNET_h2s (&ke->element->element_hash));
1643  GNUNET_MQ_send (op->mq, ev);
1644  return GNUNET_YES;
1645 }
1646 
1647 
1654 static void
1656  struct IBF_Key ibf_key)
1657 {
1658  struct SendElementClosure send_cls;
1659 
1660  send_cls.ibf_key = ibf_key;
1661  send_cls.op = op;
1663  op->key_to_element,
1664  (uint32_t) ibf_key.
1665  key_val,
1667  &send_cls);
1668 }
1669 
1670 
1678 static int
1680 {
1681  struct IBF_Key key;
1682  struct IBF_Key last_key;
1683  int side;
1684  unsigned int num_decoded;
1685  struct InvertibleBloomFilter *diff_ibf;
1686 
1688 
1689  if (GNUNET_OK !=
1690  prepare_ibf (op,
1691  op->remote_ibf->size))
1692  {
1693  GNUNET_break (0);
1694  /* allocation failed */
1695  return GNUNET_SYSERR;
1696  }
1697  diff_ibf = ibf_dup (op->local_ibf);
1698  ibf_subtract (diff_ibf,
1699  op->remote_ibf);
1700 
1701  ibf_destroy (op->remote_ibf);
1702  op->remote_ibf = NULL;
1703 
1705  "decoding IBF (size=%u)\n",
1706  diff_ibf->size);
1707 
1708  num_decoded = 0;
1709  key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
1710 
1711  while (1)
1712  {
1713  int res;
1714  int cycle_detected = GNUNET_NO;
1715 
1716  last_key = key;
1717 
1718  res = ibf_decode (diff_ibf,
1719  &side,
1720  &key);
1721  if (res == GNUNET_OK)
1722  {
1724  "decoded ibf key %lx\n",
1725  (unsigned long) key.key_val);
1726  num_decoded += 1;
1727  if ((num_decoded > diff_ibf->size) ||
1728  ((num_decoded > 1) &&
1729  (last_key.key_val == key.key_val)))
1730  {
1732  "detected cyclic ibf (decoded %u/%u)\n",
1733  num_decoded,
1734  diff_ibf->size);
1735  cycle_detected = GNUNET_YES;
1736  }
1737  }
1738  if ((GNUNET_SYSERR == res) ||
1739  (GNUNET_YES == cycle_detected))
1740  {
1741  int next_order;
1742  next_order = 0;
1743  while (1 << next_order < diff_ibf->size)
1744  next_order++;
1745  next_order++;
1746  if (next_order <= MAX_IBF_ORDER)
1747  {
1749  "decoding failed, sending larger ibf (size %u)\n",
1750  1 << next_order);
1751  GNUNET_STATISTICS_update (_GSS_statistics,
1752  "# of IBF retries",
1753  1,
1754  GNUNET_NO);
1755  op->salt_send++;
1756  if (GNUNET_OK !=
1757  send_ibf (op, next_order))
1758  {
1759  /* Internal error, best we can do is shut the connection */
1761  "Failed to send IBF, closing connection\n");
1762  fail_union_operation (op);
1763  ibf_destroy (diff_ibf);
1764  return GNUNET_SYSERR;
1765  }
1766  }
1767  else
1768  {
1769  GNUNET_STATISTICS_update (_GSS_statistics,
1770  "# of failed union operations (too large)",
1771  1,
1772  GNUNET_NO);
1773  // XXX: Send the whole set, element-by-element
1775  "set union failed: reached ibf limit\n");
1776  fail_union_operation (op);
1777  ibf_destroy (diff_ibf);
1778  return GNUNET_SYSERR;
1779  }
1780  break;
1781  }
1782  if (GNUNET_NO == res)
1783  {
1784  struct GNUNET_MQ_Envelope *ev;
1785 
1787  "transmitted all values, sending DONE\n");
1788 
1789  perf_rtt.done.sent += 1;
1791  GNUNET_MQ_send (op->mq, ev);
1792  /* We now wait until we get a DONE message back
1793  * and then wait for our MQ to be flushed and all our
1794  * demands be delivered. */
1795  break;
1796  }
1797  if (1 == side)
1798  {
1799  struct IBF_Key unsalted_key;
1800 
1801  unsalt_key (&key,
1802  op->salt_receive,
1803  &unsalted_key);
1804  send_offers_for_key (op,
1805  unsalted_key);
1806  }
1807  else if (-1 == side)
1808  {
1809  struct GNUNET_MQ_Envelope *ev;
1810  struct InquiryMessage *msg;
1811 
1812  perf_rtt.inquery.sent += 1;
1813  perf_rtt.inquery.sent_var_bytes += sizeof(struct IBF_Key);
1814  /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
1815  * the effort additional complexity. */
1816  ev = GNUNET_MQ_msg_extra (msg,
1817  sizeof(struct IBF_Key),
1819  msg->salt = htonl (op->salt_receive);
1820  GNUNET_memcpy (&msg[1],
1821  &key,
1822  sizeof(struct IBF_Key));
1824  "sending element inquiry for IBF key %lx\n",
1825  (unsigned long) key.key_val);
1826  GNUNET_MQ_send (op->mq, ev);
1827  }
1828  else
1829  {
1830  GNUNET_assert (0);
1831  }
1832  }
1833  ibf_destroy (diff_ibf);
1834  return GNUNET_OK;
1835 }
1836 
1837 
1848 static int
1850  const struct IBFMessage *msg)
1851 {
1852  struct Operation *op = cls;
1853  unsigned int buckets_in_message;
1854 
1855  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1856  / IBF_BUCKET_SIZE;
1857  if (0 == buckets_in_message)
1858  {
1859  GNUNET_break_op (0);
1860  return GNUNET_SYSERR;
1861  }
1862  if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
1863  * IBF_BUCKET_SIZE)
1864  {
1865  GNUNET_break_op (0);
1866  return GNUNET_SYSERR;
1867  }
1868  if (op->phase == PHASE_EXPECT_IBF_LAST)
1869  {
1870  if (ntohl (msg->offset) != op->ibf_buckets_received)
1871  {
1872  GNUNET_break_op (0);
1873  return GNUNET_SYSERR;
1874  }
1875  if (1 << msg->order != op->remote_ibf->size)
1876  {
1877  GNUNET_break_op (0);
1878  return GNUNET_SYSERR;
1879  }
1880  if (ntohl (msg->salt) != op->salt_receive)
1881  {
1882  GNUNET_break_op (0);
1883  return GNUNET_SYSERR;
1884  }
1885  }
1886  else if ((op->phase != PHASE_PASSIVE_DECODING) &&
1887  (op->phase != PHASE_EXPECT_IBF))
1888  {
1889  GNUNET_break_op (0);
1890  return GNUNET_SYSERR;
1891  }
1892 
1893  return GNUNET_OK;
1894 }
1895 
1896 
1906 static void
1908  const struct IBFMessage *msg)
1909 {
1910  struct Operation *op = cls;
1911  unsigned int buckets_in_message;
1912 
1913  perf_rtt.ibf.received += 1;
1914  perf_rtt.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg);
1915 
1916  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
1917  / IBF_BUCKET_SIZE;
1918  if ((op->phase == PHASE_PASSIVE_DECODING) ||
1919  (op->phase == PHASE_EXPECT_IBF))
1920  {
1922  GNUNET_assert (NULL == op->remote_ibf);
1924  "Creating new ibf of size %u\n",
1925  1 << msg->order);
1926  op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
1927  op->salt_receive = ntohl (msg->salt);
1929  "Receiving new IBF with salt %u\n",
1930  op->salt_receive);
1931  if (NULL == op->remote_ibf)
1932  {
1934  "Failed to parse remote IBF, closing connection\n");
1935  fail_union_operation (op);
1936  return;
1937  }
1938  op->ibf_buckets_received = 0;
1939  if (0 != ntohl (msg->offset))
1940  {
1941  GNUNET_break_op (0);
1942  fail_union_operation (op);
1943  return;
1944  }
1945  }
1946  else
1947  {
1950  "Received more of IBF\n");
1951  }
1952  GNUNET_assert (NULL != op->remote_ibf);
1953 
1954  ibf_read_slice (&msg[1],
1956  buckets_in_message,
1957  op->remote_ibf);
1958  op->ibf_buckets_received += buckets_in_message;
1959 
1960  if (op->ibf_buckets_received == op->remote_ibf->size)
1961  {
1963  "received full ibf\n");
1965  if (GNUNET_OK !=
1966  decode_and_send (op))
1967  {
1968  /* Internal error, best we can do is shut down */
1970  "Failed to decode IBF, closing connection\n");
1971  fail_union_operation (op);
1972  return;
1973  }
1974  }
1976 }
1977 
1978 
1987 static void
1989  const struct GNUNET_SETU_Element *element,
1991 {
1992  struct GNUNET_MQ_Envelope *ev;
1993  struct GNUNET_SETU_ResultMessage *rm;
1994 
1996  "sending element (size %u) to client\n",
1997  element->size);
1998  GNUNET_assert (0 != op->client_request_id);
1999  ev = GNUNET_MQ_msg_extra (rm,
2000  element->size,
2002  if (NULL == ev)
2003  {
2004  GNUNET_MQ_discard (ev);
2005  GNUNET_break (0);
2006  return;
2007  }
2008  rm->result_status = htons (status);
2009  rm->request_id = htonl (op->client_request_id);
2010  rm->element_type = htons (element->element_type);
2012  op->key_to_element));
2013  GNUNET_memcpy (&rm[1],
2014  element->data,
2015  element->size);
2016  GNUNET_MQ_send (op->set->cs->mq,
2017  ev);
2018 }
2019 
2020 
2026 static void
2028 {
2029  unsigned int num_demanded;
2030 
2031  num_demanded = GNUNET_CONTAINER_multihashmap_size (
2032  op->demanded_hashes);
2033 
2034  if (PHASE_FINISH_WAITING == op->phase)
2035  {
2037  "In PHASE_FINISH_WAITING, pending %u demands\n",
2038  num_demanded);
2039  if (0 == num_demanded)
2040  {
2041  struct GNUNET_MQ_Envelope *ev;
2042 
2043  op->phase = PHASE_FINISHED;
2044  perf_rtt.done.sent += 1;
2046  GNUNET_MQ_send (op->mq,
2047  ev);
2048  /* We now wait until the other peer sends P2P_OVER
2049  * after it got all elements from us. */
2050  }
2051  }
2052  if (PHASE_FINISH_CLOSING == op->phase)
2053  {
2055  "In PHASE_FINISH_CLOSING, pending %u demands\n",
2056  num_demanded);
2057  if (0 == num_demanded)
2058  {
2059  op->phase = PHASE_FINISHED;
2060  send_client_done (op);
2062  }
2063  }
2064 }
2065 
2066 
2073 static int
2075  const struct GNUNET_SETU_ElementMessage *emsg)
2076 {
2077  struct Operation *op = cls;
2078 
2080  {
2081  GNUNET_break_op (0);
2082  return GNUNET_SYSERR;
2083  }
2084  return GNUNET_OK;
2085 }
2086 
2087 
2096 static void
2098  const struct GNUNET_SETU_ElementMessage *emsg)
2099 {
2100  struct Operation *op = cls;
2101  struct ElementEntry *ee;
2102  struct KeyEntry *ke;
2103  uint16_t element_size;
2104 
2105 
2106  element_size = ntohs (emsg->header.size) - sizeof(struct
2108  perf_rtt.element.received += 1;
2110 
2111  ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
2112  GNUNET_memcpy (&ee[1],
2113  &emsg[1],
2114  element_size);
2115  ee->element.size = element_size;
2116  ee->element.data = &ee[1];
2117  ee->element.element_type = ntohs (emsg->element_type);
2118  ee->remote = GNUNET_YES;
2120  &ee->element_hash);
2121  if (GNUNET_NO ==
2123  &ee->element_hash,
2124  NULL))
2125  {
2126  /* We got something we didn't demand, since it's not in our map. */
2127  GNUNET_break_op (0);
2128  fail_union_operation (op);
2129  return;
2130  }
2131 
2133  "Got element (size %u, hash %s) from peer\n",
2134  (unsigned int) element_size,
2135  GNUNET_h2s (&ee->element_hash));
2136 
2137  GNUNET_STATISTICS_update (_GSS_statistics,
2138  "# received elements",
2139  1,
2140  GNUNET_NO);
2141  GNUNET_STATISTICS_update (_GSS_statistics,
2142  "# exchanged elements",
2143  1,
2144  GNUNET_NO);
2145 
2146  op->received_total++;
2147 
2148  ke = op_get_element (op,
2149  &ee->element_hash);
2150  if (NULL != ke)
2151  {
2152  /* Got repeated element. Should not happen since
2153  * we track demands. */
2154  GNUNET_STATISTICS_update (_GSS_statistics,
2155  "# repeated elements",
2156  1,
2157  GNUNET_NO);
2158  ke->received = GNUNET_YES;
2159  GNUNET_free (ee);
2160  }
2161  else
2162  {
2164  "Registering new element from remote peer\n");
2165  op->received_fresh++;
2166  op_register_element (op, ee, GNUNET_YES);
2167  /* only send results immediately if the client wants it */
2168  send_client_element (op,
2169  &ee->element,
2171  }
2172 
2173  if ((op->received_total > 8) &&
2174  (op->received_fresh < op->received_total / 3))
2175  {
2176  /* The other peer gave us lots of old elements, there's something wrong. */
2177  GNUNET_break_op (0);
2178  fail_union_operation (op);
2179  return;
2180  }
2182  maybe_finish (op);
2183 }
2184 
2185 
2192 static int
2194  const struct GNUNET_SETU_ElementMessage *emsg)
2195 {
2196  struct Operation *op = cls;
2197 
2198  (void) op;
2199 
2200  // FIXME: check that we expect full elements here?
2201  return GNUNET_OK;
2202 }
2203 
2204 
2211 static void
2213  const struct GNUNET_SETU_ElementMessage *emsg)
2214 {
2215  struct Operation *op = cls;
2216  struct ElementEntry *ee;
2217  struct KeyEntry *ke;
2218  uint16_t element_size;
2219 
2220 
2221 
2222  if(PHASE_EXPECT_IBF == op->phase) {
2224  }
2225 
2226 
2227 
2228  /* Allow only receiving of full element message if in expect IBF or in PHASE_FULL_RECEIVING state */
2229  if ((PHASE_FULL_RECEIVING != op->phase) &&
2230  (PHASE_FULL_SENDING != op->phase))
2231  {
2233  "Handle full element phase is %u\n",
2234  (unsigned) op->phase);
2235  GNUNET_break_op (0);
2236  fail_union_operation (op);
2237  return;
2238  }
2239 
2240 
2241 
2242  element_size = ntohs (emsg->header.size)
2243  - sizeof(struct GNUNET_SETU_ElementMessage);
2244 
2247 
2248  ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
2249  GNUNET_memcpy (&ee[1], &emsg[1], element_size);
2250  ee->element.size = element_size;
2251  ee->element.data = &ee[1];
2252  ee->element.element_type = ntohs (emsg->element_type);
2253  ee->remote = GNUNET_YES;
2255  &ee->element_hash);
2257  "Got element (full diff, size %u, hash %s) from peer\n",
2258  (unsigned int) element_size,
2259  GNUNET_h2s (&ee->element_hash));
2260 
2261  GNUNET_STATISTICS_update (_GSS_statistics,
2262  "# received elements",
2263  1,
2264  GNUNET_NO);
2265  GNUNET_STATISTICS_update (_GSS_statistics,
2266  "# exchanged elements",
2267  1,
2268  GNUNET_NO);
2269 
2270  op->received_total++;
2271 
2272  ke = op_get_element (op,
2273  &ee->element_hash);
2274  if (NULL != ke)
2275  {
2276  /* Got repeated element. Should not happen since
2277  * we track demands. */
2278  GNUNET_STATISTICS_update (_GSS_statistics,
2279  "# repeated elements",
2280  1,
2281  GNUNET_NO);
2282  ke->received = GNUNET_YES;
2283  GNUNET_free (ee);
2284  }
2285  else
2286  {
2288  "Registering new element from remote peer\n");
2289  op->received_fresh++;
2290  op_register_element (op, ee, GNUNET_YES);
2291  /* only send results immediately if the client wants it */
2292  send_client_element (op,
2293  &ee->element,
2295  }
2296 
2297  if ((GNUNET_YES == op->byzantine) &&
2298  (op->received_total > 384 + op->received_fresh * 4) &&
2299  (op->received_fresh < op->received_total / 6))
2300  {
2301  /* The other peer gave us lots of old elements, there's something wrong. */
2303  "Other peer sent only %llu/%llu fresh elements, failing operation\n",
2304  (unsigned long long) op->received_fresh,
2305  (unsigned long long) op->received_total);
2306  GNUNET_break_op (0);
2307  fail_union_operation (op);
2308  return;
2309  }
2311 }
2312 
2313 
2321 static int
2323  const struct InquiryMessage *msg)
2324 {
2325  struct Operation *op = cls;
2326  unsigned int num_keys;
2327 
2328  if (op->phase != PHASE_PASSIVE_DECODING)
2329  {
2330  GNUNET_break_op (0);
2331  return GNUNET_SYSERR;
2332  }
2333  num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2334  / sizeof(struct IBF_Key);
2335  if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2336  != num_keys * sizeof(struct IBF_Key))
2337  {
2338  GNUNET_break_op (0);
2339  return GNUNET_SYSERR;
2340  }
2341  return GNUNET_OK;
2342 }
2343 
2344 
2351 static void
2353  const struct InquiryMessage *msg)
2354 {
2355  struct Operation *op = cls;
2356  const struct IBF_Key *ibf_key;
2357  unsigned int num_keys;
2358 
2359  perf_rtt.inquery.received += 1;
2360  perf_rtt.inquery.received_var_bytes += (ntohs (msg->header.size) - sizeof(struct InquiryMessage));
2361 
2363  "Received union inquiry\n");
2364  num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
2365  / sizeof(struct IBF_Key);
2366  ibf_key = (const struct IBF_Key *) &msg[1];
2367  while (0 != num_keys--)
2368  {
2369  struct IBF_Key unsalted_key;
2370 
2371  unsalt_key (ibf_key,
2372  ntohl (msg->salt),
2373  &unsalted_key);
2374  send_offers_for_key (op,
2375  unsalted_key);
2376  ibf_key++;
2377  }
2379 }
2380 
2381 
2392 static int
2394  uint32_t key,
2395  void *value)
2396 {
2397  struct Operation *op = cls;
2398  struct KeyEntry *ke = value;
2399  struct GNUNET_MQ_Envelope *ev;
2400  struct GNUNET_SETU_ElementMessage *emsg;
2401  struct ElementEntry *ee = ke->element;
2402 
2403  if (GNUNET_YES == ke->received)
2404  return GNUNET_YES;
2406  ev = GNUNET_MQ_msg_extra (emsg,
2407  ee->element.size,
2409  GNUNET_memcpy (&emsg[1],
2410  ee->element.data,
2411  ee->element.size);
2412  emsg->element_type = htons (ee->element.element_type);
2413  GNUNET_MQ_send (op->mq,
2414  ev);
2415  return GNUNET_YES;
2416 }
2417 
2418 
2425 static void
2427  const struct GNUNET_MessageHeader *mh)
2428 {
2429  struct Operation *op = cls;
2430 
2432 
2434  "Received request for full set transmission\n");
2435  if (PHASE_EXPECT_IBF != op->phase)
2436  {
2437  GNUNET_break_op (0);
2438  fail_union_operation (op);
2439  return;
2440  }
2441 
2442  // FIXME: we need to check that our set is larger than the
2443  // byzantine_lower_bound by some threshold
2444  send_full_set (op);
2446 }
2447 
2448 
2455 static void
2457  const struct GNUNET_MessageHeader *mh)
2458 {
2459  struct Operation *op = cls;
2460 
2462 
2463  switch (op->phase)
2464  {
2465  case PHASE_FULL_RECEIVING:
2466  {
2467  struct GNUNET_MQ_Envelope *ev;
2468 
2470  "got FULL DONE, sending elements that other peer is missing\n");
2471 
2472  /* send all the elements that did not come from the remote peer */
2475  op);
2476  perf_rtt.full_done.sent += 1;
2478  GNUNET_MQ_send (op->mq,
2479  ev);
2480  op->phase = PHASE_FINISHED;
2481  /* we now wait until the other peer sends us the OVER message*/
2482  }
2483  break;
2484 
2485  case PHASE_FULL_SENDING:
2486  {
2488  "got FULL DONE, finishing\n");
2489  /* We sent the full set, and got the response for that. We're done. */
2490  op->phase = PHASE_FINISHED;
2492  send_client_done (op);
2494  return;
2495  }
2496 
2497  default:
2499  "Handle full done phase is %u\n",
2500  (unsigned) op->phase);
2501  GNUNET_break_op (0);
2502  fail_union_operation (op);
2503  return;
2504  }
2506 }
2507 
2508 
2517 static int
2519  const struct GNUNET_MessageHeader *mh)
2520 {
2521  struct Operation *op = cls;
2522  unsigned int num_hashes;
2523 
2524  (void) op;
2525  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2526  / sizeof(struct GNUNET_HashCode);
2527  if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2528  != num_hashes * sizeof(struct GNUNET_HashCode))
2529  {
2530  GNUNET_break_op (0);
2531  return GNUNET_SYSERR;
2532  }
2533  return GNUNET_OK;
2534 }
2535 
2536 
2544 static void
2546  const struct GNUNET_MessageHeader *mh)
2547 {
2548  struct Operation *op = cls;
2549  struct ElementEntry *ee;
2550  struct GNUNET_SETU_ElementMessage *emsg;
2551  const struct GNUNET_HashCode *hash;
2552  unsigned int num_hashes;
2553  struct GNUNET_MQ_Envelope *ev;
2554 
2555  perf_rtt.demand.received += 1;
2556  perf_rtt.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader));
2557 
2558  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2559  / sizeof(struct GNUNET_HashCode);
2560  for (hash = (const struct GNUNET_HashCode *) &mh[1];
2561  num_hashes > 0;
2562  hash++, num_hashes--)
2563  {
2565  hash);
2566  if (NULL == ee)
2567  {
2568  /* Demand for non-existing element. */
2569  GNUNET_break_op (0);
2570  fail_union_operation (op);
2571  return;
2572  }
2573  if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
2574  {
2575  /* Probably confused lazily copied sets. */
2576  GNUNET_break_op (0);
2577  fail_union_operation (op);
2578  return;
2579  }
2580  perf_rtt.element.sent += 1;
2582  ev = GNUNET_MQ_msg_extra (emsg,
2583  ee->element.size,
2585  GNUNET_memcpy (&emsg[1],
2586  ee->element.data,
2587  ee->element.size);
2588  emsg->reserved = htons (0);
2589  emsg->element_type = htons (ee->element.element_type);
2591  "[OP %p] Sending demanded element (size %u, hash %s) to peer\n",
2592  op,
2593  (unsigned int) ee->element.size,
2594  GNUNET_h2s (&ee->element_hash));
2595  GNUNET_MQ_send (op->mq, ev);
2596  GNUNET_STATISTICS_update (_GSS_statistics,
2597  "# exchanged elements",
2598  1,
2599  GNUNET_NO);
2600  if (op->symmetric)
2601  send_client_element (op,
2602  &ee->element,
2604  }
2606 }
2607 
2608 
2616 static int
2618  const struct GNUNET_MessageHeader *mh)
2619 {
2620  struct Operation *op = cls;
2621  unsigned int num_hashes;
2622 
2623  /* look up elements and send them */
2624  if ((op->phase != PHASE_PASSIVE_DECODING) &&
2625  (op->phase != PHASE_ACTIVE_DECODING))
2626  {
2627  GNUNET_break_op (0);
2628  return GNUNET_SYSERR;
2629  }
2630  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2631  / sizeof(struct GNUNET_HashCode);
2632  if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
2633  num_hashes * sizeof(struct GNUNET_HashCode))
2634  {
2635  GNUNET_break_op (0);
2636  return GNUNET_SYSERR;
2637  }
2638  return GNUNET_OK;
2639 }
2640 
2641 
2649 static void
2651  const struct GNUNET_MessageHeader *mh)
2652 {
2653  struct Operation *op = cls;
2654  const struct GNUNET_HashCode *hash;
2655  unsigned int num_hashes;
2656 
2657  perf_rtt.offer.received += 1;
2658  perf_rtt.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader));
2659 
2660  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
2661  / sizeof(struct GNUNET_HashCode);
2662  for (hash = (const struct GNUNET_HashCode *) &mh[1];
2663  num_hashes > 0;
2664  hash++, num_hashes--)
2665  {
2666  struct ElementEntry *ee;
2667  struct GNUNET_MessageHeader *demands;
2668  struct GNUNET_MQ_Envelope *ev;
2669 
2671  hash);
2672  if (NULL != ee)
2673  if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
2674  continue;
2675 
2676  if (GNUNET_YES ==
2678  hash))
2679  {
2681  "Skipped sending duplicate demand\n");
2682  continue;
2683  }
2684 
2687  op->demanded_hashes,
2688  hash,
2689  NULL,
2691 
2693  "[OP %p] Requesting element (hash %s)\n",
2694  op, GNUNET_h2s (hash));
2695 
2696  perf_rtt.demand.sent += 1;
2697  perf_rtt.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode);
2698  ev = GNUNET_MQ_msg_header_extra (demands,
2699  sizeof(struct GNUNET_HashCode),
2701  GNUNET_memcpy (&demands[1],
2702  hash,
2703  sizeof(struct GNUNET_HashCode));
2704  GNUNET_MQ_send (op->mq, ev);
2705  }
2707 }
2708 
2709 
2716 static void
2718  const struct GNUNET_MessageHeader *mh)
2719 {
2720  struct Operation *op = cls;
2721 
2722  perf_rtt.done.received += 1;
2723  switch (op->phase)
2724  {
2726  /* We got all requests, but still have to send our elements in response. */
2729  "got DONE (as passive partner), waiting for our demands to be satisfied\n");
2730  /* The active peer is done sending offers
2731  * and inquiries. This means that all
2732  * our responses to that (demands and offers)
2733  * must be in flight (queued or in mesh).
2734  *
2735  * We should notify the active peer once
2736  * all our demands are satisfied, so that the active
2737  * peer can quit if we gave it everything.
2739  maybe_finish (op);
2740  return;
2741  case PHASE_ACTIVE_DECODING:
2743  "got DONE (as active partner), waiting to finish\n");
2744  /* All demands of the other peer are satisfied,
2745  * and we processed all offers, thus we know
2746  * exactly what our demands must be.
2747  *
2748  * We'll close the channel
2749  * to the other peer once our demands are met.
2750  */op->phase = PHASE_FINISH_CLOSING;
2752  maybe_finish (op);
2753  return;
2754  default:
2755  GNUNET_break_op (0);
2756  fail_union_operation (op);
2757  return;
2758  }
2759 }
2760 
2761 
2768 static void
2770  const struct GNUNET_MessageHeader *mh)
2771 {
2772  perf_rtt.over.received += 1;
2773  send_client_done (cls);
2774 }
2775 
2776 
2785 static struct Operation *
2786 get_incoming (uint32_t id)
2787 {
2788  for (struct Listener *listener = listener_head;
2789  NULL != listener;
2790  listener = listener->next)
2791  {
2792  for (struct Operation *op = listener->op_head;
2793  NULL != op;
2794  op = op->next)
2795  if (op->suggest_id == id)
2796  return op;
2797  }
2798  return NULL;
2799 }
2800 
2801 
2810 static void *
2812  struct GNUNET_SERVICE_Client *c,
2813  struct GNUNET_MQ_Handle *mq)
2814 {
2815  struct ClientState *cs;
2816 
2817  num_clients++;
2818  cs = GNUNET_new (struct ClientState);
2819  cs->client = c;
2820  cs->mq = mq;
2821  return cs;
2822 }
2823 
2824 
2833 static int
2835  const struct GNUNET_HashCode *key,
2836  void *value)
2837 {
2838  struct ElementEntry *ee = value;
2839 
2840  GNUNET_free (ee);
2841  return GNUNET_YES;
2842 }
2843 
2844 
2852 static void
2854  struct GNUNET_SERVICE_Client *client,
2855  void *internal_cls)
2856 {
2857  struct ClientState *cs = internal_cls;
2858  struct Operation *op;
2859  struct Listener *listener;
2860  struct Set *set;
2861 
2863  "Client disconnected, cleaning up\n");
2864  if (NULL != (set = cs->set))
2865  {
2866  struct SetContent *content = set->content;
2867 
2869  "Destroying client's set\n");
2870  /* Destroy pending set operations */
2871  while (NULL != set->ops_head)
2872  _GSS_operation_destroy (set->ops_head);
2873 
2874  /* Destroy operation-specific state */
2875  if (NULL != set->se)
2876  {
2877  strata_estimator_destroy (set->se);
2878  set->se = NULL;
2879  }
2880  /* free set content (or at least decrement RC) */
2881  set->content = NULL;
2882  GNUNET_assert (0 != content->refcount);
2883  content->refcount--;
2884  if (0 == content->refcount)
2885  {
2886  GNUNET_assert (NULL != content->elements);
2889  NULL);
2891  content->elements = NULL;
2892  GNUNET_free (content);
2893  }
2894  GNUNET_free (set);
2895  }
2896 
2897  if (NULL != (listener = cs->listener))
2898  {
2900  "Destroying client's listener\n");
2901  GNUNET_CADET_close_port (listener->open_port);
2902  listener->open_port = NULL;
2903  while (NULL != (op = listener->op_head))
2904  {
2906  "Destroying incoming operation `%u' from peer `%s'\n",
2907  (unsigned int) op->client_request_id,
2908  GNUNET_i2s (&op->peer));
2909  incoming_destroy (op);
2910  }
2911  GNUNET_CONTAINER_DLL_remove (listener_head,
2912  listener_tail,
2913  listener);
2914  GNUNET_free (listener);
2915  }
2916  GNUNET_free (cs);
2917  num_clients--;
2918  if ( (GNUNET_YES == in_shutdown) &&
2919  (0 == num_clients) )
2920  {
2921  if (NULL != cadet)
2922  {
2923  GNUNET_CADET_disconnect (cadet);
2924  cadet = NULL;
2925  }
2926  }
2927 }
2928 
2929 
2938 static int
2940  const struct OperationRequestMessage *msg)
2941 {
2942  struct Operation *op = cls;
2943  struct Listener *listener = op->listener;
2944  const struct GNUNET_MessageHeader *nested_context;
2945 
2946  /* double operation request */
2947  if (0 != op->suggest_id)
2948  {
2949  GNUNET_break_op (0);
2950  return GNUNET_SYSERR;
2951  }
2952  /* This should be equivalent to the previous condition, but can't hurt to check twice */
2953  if (NULL == listener)
2954  {
2955  GNUNET_break (0);
2956  return GNUNET_SYSERR;
2957  }
2958  nested_context = GNUNET_MQ_extract_nested_mh (msg);
2959  if ((NULL != nested_context) &&
2960  (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
2961  {
2962  GNUNET_break_op (0);
2963  return GNUNET_SYSERR;
2964  }
2965  return GNUNET_OK;
2966 }
2967 
2968 
2984 static void
2986  const struct OperationRequestMessage *msg)
2987 {
2988  struct Operation *op = cls;
2989  struct Listener *listener = op->listener;
2990  const struct GNUNET_MessageHeader *nested_context;
2991  struct GNUNET_MQ_Envelope *env;
2992  struct GNUNET_SETU_RequestMessage *cmsg;
2993 
2994  nested_context = GNUNET_MQ_extract_nested_mh (msg);
2995  /* Make a copy of the nested_context (application-specific context
2996  information that is opaque to set) so we can pass it to the
2997  listener later on */
2998  if (NULL != nested_context)
2999  op->context_msg = GNUNET_copy_message (nested_context);
3000  op->remote_element_count = ntohl (msg->element_count);
3001  GNUNET_log (
3003  "Received P2P operation request (port %s) for active listener\n",
3004  GNUNET_h2s (&op->listener->app_id));
3005  GNUNET_assert (0 == op->suggest_id);
3006  if (0 == suggest_id)
3007  suggest_id++;
3008  op->suggest_id = suggest_id++;
3009  GNUNET_assert (NULL != op->timeout_task);
3011  op->timeout_task = NULL;
3012  env = GNUNET_MQ_msg_nested_mh (cmsg,
3014  op->context_msg);
3015  GNUNET_log (
3017  "Suggesting incoming request with accept id %u to listener %p of client %p\n",
3018  op->suggest_id,
3019  listener,
3020  listener->cs);
3021  cmsg->accept_id = htonl (op->suggest_id);
3022  cmsg->peer_id = op->peer;
3023  GNUNET_MQ_send (listener->cs->mq,
3024  env);
3025  /* NOTE: GNUNET_CADET_receive_done() will be called in
3026  #handle_client_accept() */
3027 }
3028 
3029 
3038 static void
3040  const struct GNUNET_SETU_CreateMessage *msg)
3041 {
3042  struct ClientState *cs = cls;
3043  struct Set *set;
3044 
3046  "Client created new set for union operation\n");
3047  if (NULL != cs->set)
3048  {
3049  /* There can only be one set per client */
3050  GNUNET_break (0);
3052  return;
3053  }
3054  set = GNUNET_new (struct Set);
3055  {
3056  struct StrataEstimator *se;
3057 
3059  SE_IBF_SIZE,
3060  SE_IBF_HASH_NUM);
3061  if (NULL == se)
3062  {
3064  "Failed to allocate strata estimator\n");
3065  GNUNET_free (set);
3067  return;
3068  }
3069  set->se = se;
3070  }
3071  set->content = GNUNET_new (struct SetContent);
3072  set->content->refcount = 1;
3073  set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
3074  GNUNET_YES);
3075  set->cs = cs;
3076  cs->set = set;
3078 }
3079 
3080 
3090 static void
3092 {
3093  struct Operation *op = cls;
3094 
3095  op->timeout_task = NULL;
3097  "Remote peer's incoming request timed out\n");
3098  incoming_destroy (op);
3099 }
3100 
3101 
3118 static void *
3119 channel_new_cb (void *cls,
3120  struct GNUNET_CADET_Channel *channel,
3121  const struct GNUNET_PeerIdentity *source)
3122 {
3123  struct Listener *listener = cls;
3124  struct Operation *op;
3125 
3127  "New incoming channel\n");
3128  op = GNUNET_new (struct Operation);
3129  op->listener = listener;
3130  op->peer = *source;
3131  op->channel = channel;
3132  op->mq = GNUNET_CADET_get_mq (op->channel);
3134  UINT32_MAX);
3137  op);
3139  listener->op_tail,
3140  op);
3141  return op;
3142 }
3143 
3144 
3161 static void
3162 channel_end_cb (void *channel_ctx,
3163  const struct GNUNET_CADET_Channel *channel)
3164 {
3165  struct Operation *op = channel_ctx;
3166 
3167  op->channel = NULL;
3169 }
3170 
3171 
3186 static void
3188  const struct GNUNET_CADET_Channel *channel,
3189  int window_size)
3190 {
3191  /* FIXME: not implemented, we could do flow control here... */
3192 }
3193 
3194 
3201 static void
3203  const struct GNUNET_SETU_ListenMessage *msg)
3204 {
3205  struct ClientState *cs = cls;
3206  struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
3207  GNUNET_MQ_hd_var_size (incoming_msg,
3209  struct OperationRequestMessage,
3210  NULL),
3211  GNUNET_MQ_hd_var_size (union_p2p_ibf,
3213  struct IBFMessage,
3214  NULL),
3215  GNUNET_MQ_hd_var_size (union_p2p_elements,
3218  NULL),
3219  GNUNET_MQ_hd_var_size (union_p2p_offer,
3221  struct GNUNET_MessageHeader,
3222  NULL),
3223  GNUNET_MQ_hd_var_size (union_p2p_inquiry,
3225  struct InquiryMessage,
3226  NULL),
3227  GNUNET_MQ_hd_var_size (union_p2p_demand,
3229  struct GNUNET_MessageHeader,
3230  NULL),
3231  GNUNET_MQ_hd_fixed_size (union_p2p_done,
3233  struct GNUNET_MessageHeader,
3234  NULL),
3235  GNUNET_MQ_hd_fixed_size (union_p2p_over,
3237  struct GNUNET_MessageHeader,
3238  NULL),
3239  GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
3241  struct GNUNET_MessageHeader,
3242  NULL),
3243  GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
3245  struct GNUNET_MessageHeader,
3246  NULL),
3247  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3249  struct StrataEstimatorMessage,
3250  NULL),
3251  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3253  struct StrataEstimatorMessage,
3254  NULL),
3255  GNUNET_MQ_hd_var_size (union_p2p_full_element,
3258  NULL),
3260  };
3261  struct Listener *listener;
3262 
3263  if (NULL != cs->listener)
3264  {
3265  /* max. one active listener per client! */
3266  GNUNET_break (0);
3268  return;
3269  }
3270  listener = GNUNET_new (struct Listener);
3271  listener->cs = cs;
3272  cs->listener = listener;
3273  listener->app_id = msg->app_id;
3274  GNUNET_CONTAINER_DLL_insert (listener_head,
3275  listener_tail,
3276  listener);
3278  "New listener created (port %s)\n",
3279  GNUNET_h2s (&listener->app_id));
3280  listener->open_port = GNUNET_CADET_open_port (cadet,
3281  &msg->app_id,
3282  &channel_new_cb,
3283  listener,
3285  &channel_end_cb,
3286  cadet_handlers);
3288 }
3289 
3290 
3298 static void
3300  const struct GNUNET_SETU_RejectMessage *msg)
3301 {
3302  struct ClientState *cs = cls;
3303  struct Operation *op;
3304 
3305  op = get_incoming (ntohl (msg->accept_reject_id));
3306  if (NULL == op)
3307  {
3308  /* no matching incoming operation for this reject;
3309  could be that the other peer already disconnected... */
3311  "Client rejected unknown operation %u\n",
3312  (unsigned int) ntohl (msg->accept_reject_id));
3314  return;
3315  }
3317  "Peer request (app %s) rejected by client\n",
3318  GNUNET_h2s (&cs->listener->app_id));
3321 }
3322 
3323 
3330 static int
3332  const struct GNUNET_SETU_ElementMessage *msg)
3333 {
3334  /* NOTE: Technically, we should probably check with the
3335  block library whether the element we are given is well-formed */
3336  return GNUNET_OK;
3337 }
3338 
3339 
3346 static void
3348  const struct GNUNET_SETU_ElementMessage *msg)
3349 {
3350  struct ClientState *cs = cls;
3351  struct Set *set;
3352  struct GNUNET_SETU_Element el;
3353  struct ElementEntry *ee;
3354  struct GNUNET_HashCode hash;
3355 
3356  if (NULL == (set = cs->set))
3357  {
3358  /* client without a set requested an operation */
3359  GNUNET_break (0);
3361  return;
3362  }
3364  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
3365  el.size = ntohs (msg->header.size) - sizeof(*msg);
3366  el.data = &msg[1];
3367  el.element_type = ntohs (msg->element_type);
3369  &hash);
3370  ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
3371  &hash);
3372  if (NULL == ee)
3373  {
3375  "Client inserts element %s of size %u\n",
3376  GNUNET_h2s (&hash),
3377  el.size);
3378  ee = GNUNET_malloc (el.size + sizeof(*ee));
3379  ee->element.size = el.size;
3380  GNUNET_memcpy (&ee[1], el.data, el.size);
3381  ee->element.data = &ee[1];
3383  ee->remote = GNUNET_NO;
3384  ee->generation = set->current_generation;
3385  ee->element_hash = hash;
3388  set->content->elements,
3389  &ee->element_hash,
3390  ee,
3392  }
3393  else
3394  {
3396  "Client inserted element %s of size %u twice (ignored)\n",
3397  GNUNET_h2s (&hash),
3398  el.size);
3399  /* same element inserted twice */
3400  return;
3401  }
3402  strata_estimator_insert (set->se,
3403  get_ibf_key (&ee->element_hash));
3404 }
3405 
3406 
3413 static void
3415 {
3416  set->content->latest_generation++;
3417  set->current_generation++;
3418 }
3419 
3420 
3430 static int
3432  const struct GNUNET_SETU_EvaluateMessage *msg)
3433 {
3434  /* FIXME: suboptimal, even if the context below could be NULL,
3435  there are malformed messages this does not check for... */
3436  return GNUNET_OK;
3437 }
3438 
3439 
3448 static void
3450  const struct GNUNET_SETU_EvaluateMessage *msg)
3451 {
3452  struct ClientState *cs = cls;
3453  struct Operation *op = GNUNET_new (struct Operation);
3454  const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
3455  GNUNET_MQ_hd_var_size (incoming_msg,
3457  struct OperationRequestMessage,
3458  op),
3459  GNUNET_MQ_hd_var_size (union_p2p_ibf,
3461  struct IBFMessage,
3462  op),
3463  GNUNET_MQ_hd_var_size (union_p2p_elements,
3466  op),
3467  GNUNET_MQ_hd_var_size (union_p2p_offer,
3469  struct GNUNET_MessageHeader,
3470  op),
3471  GNUNET_MQ_hd_var_size (union_p2p_inquiry,
3473  struct InquiryMessage,
3474  op),
3475  GNUNET_MQ_hd_var_size (union_p2p_demand,
3477  struct GNUNET_MessageHeader,
3478  op),
3479  GNUNET_MQ_hd_fixed_size (union_p2p_done,
3481  struct GNUNET_MessageHeader,
3482  op),
3483  GNUNET_MQ_hd_fixed_size (union_p2p_over,
3485  struct GNUNET_MessageHeader,
3486  op),
3487  GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
3489  struct GNUNET_MessageHeader,
3490  op),
3491  GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
3493  struct GNUNET_MessageHeader,
3494  op),
3495  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3497  struct StrataEstimatorMessage,
3498  op),
3499  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
3501  struct StrataEstimatorMessage,
3502  op),
3503  GNUNET_MQ_hd_var_size (union_p2p_full_element,
3506  op),
3508  };
3509  struct Set *set;
3510  const struct GNUNET_MessageHeader *context;
3511 
3512  if (NULL == (set = cs->set))
3513  {
3514  GNUNET_break (0);
3515  GNUNET_free (op);
3517  return;
3518  }
3520  UINT32_MAX);
3521  op->peer = msg->target_peer;
3522  op->client_request_id = ntohl (msg->request_id);
3523  op->byzantine = msg->byzantine;
3524  op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
3525  op->force_full = msg->force_full;
3526  op->force_delta = msg->force_delta;
3527  op->symmetric = msg->symmetric;
3528  context = GNUNET_MQ_extract_nested_mh (msg);
3529 
3530  /* Advance generation values, so that
3531  mutations won't interfer with the running operation. */
3532  op->set = set;
3533  op->generation_created = set->current_generation;
3534  advance_generation (set);
3535  GNUNET_CONTAINER_DLL_insert (set->ops_head,
3536  set->ops_tail,
3537  op);
3539  "Creating new CADET channel to port %s for set union\n",
3540  GNUNET_h2s (&msg->app_id));
3541  op->channel = GNUNET_CADET_channel_create (cadet,
3542  op,
3543  &msg->target_peer,
3544  &msg->app_id,
3546  &channel_end_cb,
3547  cadet_handlers);
3548  op->mq = GNUNET_CADET_get_mq (op->channel);
3549  {
3550  struct GNUNET_MQ_Envelope *ev;
3551  struct OperationRequestMessage *msg;
3552 
3554  ev = GNUNET_MQ_msg_nested_mh (msg,
3556  context);
3557  if (NULL == ev)
3558  {
3559  /* the context message is too large */
3560  GNUNET_break (0);
3562  return;
3563  }
3565  GNUNET_NO);
3566  /* copy the current generation's strata estimator for this operation */
3567  op->se = strata_estimator_dup (op->set->se);
3568  /* we started the operation, thus we have to send the operation request */
3569  op->phase = PHASE_EXPECT_SE;
3570  op->salt_receive = op->salt_send = 42; // FIXME?????
3572  "Initiating union operation evaluation\n");
3573  GNUNET_STATISTICS_update (_GSS_statistics,
3574  "# of total union operations",
3575  1,
3576  GNUNET_NO);
3577  GNUNET_STATISTICS_update (_GSS_statistics,
3578  "# of initiated union operations",
3579  1,
3580  GNUNET_NO);
3581  GNUNET_MQ_send (op->mq,
3582  ev);
3583  if (NULL != context)
3585  "sent op request with context message\n");
3586  else
3588  "sent op request without context message\n");
3591  op->key_to_element);
3592 
3593  }
3595 }
3596 
3597 
3604 static void
3606  const struct GNUNET_SETU_CancelMessage *msg)
3607 {
3608  struct ClientState *cs = cls;
3609  struct Set *set;
3610  struct Operation *op;
3611  int found;
3612 
3613  if (NULL == (set = cs->set))
3614  {
3615  /* client without a set requested an operation */
3616  GNUNET_break (0);
3618  return;
3619  }
3620  found = GNUNET_NO;
3621  for (op = set->ops_head; NULL != op; op = op->next)
3622  {
3623  if (op->client_request_id == ntohl (msg->request_id))
3624  {
3625  found = GNUNET_YES;
3626  break;
3627  }
3628  }
3629  if (GNUNET_NO == found)
3630  {
3631  /* It may happen that the operation was already destroyed due to
3632  * the other peer disconnecting. The client may not know about this
3633  * yet and try to cancel the (just barely non-existent) operation.
3634  * So this is not a hard error.
3635  *///
3637  "Client canceled non-existent op %u\n",
3638  (uint32_t) ntohl (msg->request_id));
3639  }
3640  else
3641  {
3643  "Client requested cancel for op %u\n",
3644  (uint32_t) ntohl (msg->request_id));
3646  }
3648 }
3649 
3650 
3659 static void
3661  const struct GNUNET_SETU_AcceptMessage *msg)
3662 {
3663  struct ClientState *cs = cls;
3664  struct Set *set;
3665  struct Operation *op;
3666  struct GNUNET_SETU_ResultMessage *result_message;
3667  struct GNUNET_MQ_Envelope *ev;
3668  struct Listener *listener;
3669 
3670  if (NULL == (set = cs->set))
3671  {
3672  /* client without a set requested to accept */
3673  GNUNET_break (0);
3675  return;
3676  }
3677  op = get_incoming (ntohl (msg->accept_reject_id));
3678  if (NULL == op)
3679  {
3680  /* It is not an error if the set op does not exist -- it may
3681  * have been destroyed when the partner peer disconnected. */
3682  GNUNET_log (
3684  "Client %p accepted request %u of listener %p that is no longer active\n",
3685  cs,
3686  ntohl (msg->accept_reject_id),
3687  cs->listener);
3688  ev = GNUNET_MQ_msg (result_message,
3690  result_message->request_id = msg->request_id;
3691  result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
3692  GNUNET_MQ_send (set->cs->mq, ev);
3694  return;
3695  }
3697  "Client accepting request %u\n",
3698  (uint32_t) ntohl (msg->accept_reject_id));
3699  listener = op->listener;
3700  op->listener = NULL;
3702  listener->op_tail,
3703  op);
3704  op->set = set;
3705  GNUNET_CONTAINER_DLL_insert (set->ops_head,
3706  set->ops_tail,
3707  op);
3708  op->client_request_id = ntohl (msg->request_id);
3709  op->byzantine = msg->byzantine;
3710  op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
3711  op->force_full = msg->force_full;
3712  op->force_delta = msg->force_delta;
3713  op->symmetric = msg->symmetric;
3714 
3715  /* Advance generation values, so that future mutations do not
3716  interfer with the running operation. */
3717  op->generation_created = set->current_generation;
3718  advance_generation (set);
3719  GNUNET_assert (NULL == op->se);
3720 
3722  "accepting set union operation\n");
3723  GNUNET_STATISTICS_update (_GSS_statistics,
3724  "# of accepted union operations",
3725  1,
3726  GNUNET_NO);
3727  GNUNET_STATISTICS_update (_GSS_statistics,
3728  "# of total union operations",
3729  1,
3730  GNUNET_NO);
3731  {
3732  const struct StrataEstimator *se;
3733  struct GNUNET_MQ_Envelope *ev;
3734  struct StrataEstimatorMessage *strata_msg;
3735  char *buf;
3736  size_t len;
3737  uint16_t type;
3738 
3739  op->se = strata_estimator_dup (op->set->se);
3741  GNUNET_NO);
3742  op->salt_receive = op->salt_send = 42; // FIXME?????
3745  op->key_to_element);
3746 
3747  /* kick off the operation */
3748  se = op->se;
3749  buf = GNUNET_malloc (se->strata_count * IBF_BUCKET_SIZE * se->ibf_size);
3750  len = strata_estimator_write (se,
3751  buf);
3752  perf_rtt.se.sent += 1;
3754 
3755  if (len < se->strata_count * IBF_BUCKET_SIZE * se->ibf_size)
3757  else
3759  ev = GNUNET_MQ_msg_extra (strata_msg,
3760  len,
3761  type);
3762  GNUNET_memcpy (&strata_msg[1],
3763  buf,
3764  len);
3765  GNUNET_free (buf);
3766  strata_msg->set_size
3768  op->set->content->elements));
3769  GNUNET_MQ_send (op->mq,
3770  ev);
3771  op->phase = PHASE_EXPECT_IBF;
3772  }
3773  /* Now allow CADET to continue, as we did not do this in
3774  #handle_incoming_msg (as we wanted to first see if the
3775  local client would accept the request). */
3778 }
3779 
3780 
3786 static void
3787 shutdown_task (void *cls)
3788 {
3789  /* Delay actual shutdown to allow service to disconnect clients */
3791  if (0 == num_clients)
3792  {
3793  if (NULL != cadet)
3794  {
3795  GNUNET_CADET_disconnect (cadet);
3796  cadet = NULL;
3797  }
3798  }
3799  GNUNET_STATISTICS_destroy (_GSS_statistics,
3800  GNUNET_YES);
3802  "handled shutdown request\n");
3804  "RTT:%f\n", calculate_perf_rtt());
3805 }
3806 
3807 
3816 static void
3817 run (void *cls,
3818  const struct GNUNET_CONFIGURATION_Handle *cfg,
3820 {
3821  /* FIXME: need to modify SERVICE (!) API to allow
3822  us to run a shutdown task *after* clients were
3823  forcefully disconnected! */
3825  NULL);
3826  _GSS_statistics = GNUNET_STATISTICS_create ("setu",
3827  cfg);
3828  cadet = GNUNET_CADET_connect (cfg);
3829  if (NULL == cadet)
3830  {
3832  _ ("Could not connect to CADET service\n"));
3834  return;
3835  }
3836 }
3837 
3838 
3843  "set",
3845  &run,
3848  NULL,
3849  GNUNET_MQ_hd_fixed_size (client_accept,
3852  NULL),
3853  GNUNET_MQ_hd_var_size (client_set_add,
3856  NULL),
3857  GNUNET_MQ_hd_fixed_size (client_create_set,
3860  NULL),
3861  GNUNET_MQ_hd_var_size (client_evaluate,
3864  NULL),
3865  GNUNET_MQ_hd_fixed_size (client_listen,
3868  NULL),
3869  GNUNET_MQ_hd_fixed_size (client_reject,
3872  NULL),
3873  GNUNET_MQ_hd_fixed_size (client_cancel,
3876  NULL),
3878 
3879 
3880 /* end of gnunet-service-setu.c */
struct Operation * prev
Kept in a DLL of the listener, if listener is non-NULL.
Context for op_get_element_iterator.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
uint32_t offset
Offset of the strata in the rest of the message.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST
Request a set union operation from a remote peer.
struct Set * prev
Sets are held in a doubly linked list.
uint32_t received_fresh
Number of elements we received from the other peer that were not in the local set yet...
static void client_disconnect_cb(void *cls, struct GNUNET_SERVICE_Client *client, void *internal_cls)
Clean up after a client has disconnected.
struct perf_num_send_resived_msg ibf
Message sent by the client to the service to start listening for incoming requests to perform a certa...
Definition: setu.h:55
void GNUNET_CADET_disconnect(struct GNUNET_CADET_Handle *handle)
Disconnect from the cadet service.
Definition: cadet_api.c:775
static int check_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check a full element message from a remote peer.
static void done()
struct Set * set
Set, if associated with the client, otherwise NULL.
#define GNUNET_MESSAGE_TYPE_SETU_REQUEST
Notify the client of an incoming request from a remote peer.
static struct GNUNET_SERVICE_Handle * service
Handle to our service instance.
struct StrataEstimator * se
Copy of the set&#39;s strata estimator at the time of creation of this operation.
After sending the full set, wait for responses with the elements that the local peer is missing...
static int check_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Check a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
State we keep per client.
static unsigned int phase
Processing stage that we are in.
Definition: gnunet-arm.c:114
static int prepare_ibf_iterator(void *cls, uint32_t key, void *value)
Insert a key into an ibf.
static void maybe_finish(struct Operation *op)
Tests if the operation is finished, and if so notify.
struct GNUNET_MessageHeader * context_msg
Context message, may be NULL.
static const struct GNUNET_CONFIGURATION_Handle * cfg
Configuration we are using.
Definition: gnunet-abd.c:36
static void send_full_set(struct Operation *op)
Switch to full set transmission for op.
struct perf_num_send_resived_msg element
unsigned int GNUNET_CONTAINER_multihashmap_size(const struct GNUNET_CONTAINER_MultiHashMap *map)
Get the number of key-value pairs in the map.
#define SE_IBF_HASH_NUM
The hash num parameter for the difference digests and strata estimators.
enum IntersectionOperationPhase phase
Current state of the operation.
static unsigned int element_size
int received
Did we receive this element? Even if element->is_foreign is false, we might have received the element...
uint8_t reserved1
Padding, must be 0.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
static int byzantine
uint32_t suggest_id
Unique request id for the request from a remote peer, sent to the client, which will accept or reject...
struct Operation * op
Operation for which the elements should be sent.
uint32_t element_count
For Intersection: my element count.
GNUNET_SERVICE_MAIN("set", GNUNET_SERVICE_OPTION_NONE, &run, &client_connect_cb, &client_disconnect_cb, NULL, GNUNET_MQ_hd_fixed_size(client_accept, GNUNET_MESSAGE_TYPE_SETU_ACCEPT, struct GNUNET_SETU_AcceptMessage, NULL), GNUNET_MQ_hd_var_size(client_set_add, GNUNET_MESSAGE_TYPE_SETU_ADD, struct GNUNET_SETU_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size(client_create_set, GNUNET_MESSAGE_TYPE_SETU_CREATE, struct GNUNET_SETU_CreateMessage, NULL), GNUNET_MQ_hd_var_size(client_evaluate, GNUNET_MESSAGE_TYPE_SETU_EVALUATE, struct GNUNET_SETU_EvaluateMessage, NULL), GNUNET_MQ_hd_fixed_size(client_listen, GNUNET_MESSAGE_TYPE_SETU_LISTEN, struct GNUNET_SETU_ListenMessage, NULL), GNUNET_MQ_hd_fixed_size(client_reject, GNUNET_MESSAGE_TYPE_SETU_REJECT, struct GNUNET_SETU_RejectMessage, NULL), GNUNET_MQ_hd_fixed_size(client_cancel, GNUNET_MESSAGE_TYPE_SETU_CANCEL, struct GNUNET_SETU_CancelMessage, NULL), GNUNET_MQ_handler_end())
Define "main" method using service macro.
Opaque handle to the service.
Definition: cadet_api.c:38
Handle to a service.
Definition: service.c:116
#define GNUNET_MQ_msg_nested_mh(mvar, type, mh)
Allocate a GNUNET_MQ_Envelope, and append a payload message after the given message struct...
int byzantine_lower_bound
Lower bound for the set size, used only when byzantine mode is enabled.
struct GNUNET_HashCode app_id
Application id.
Definition: setu.h:198
Element should be added to the result set of the remote peer, i.e.
struct GNUNET_MQ_Handle * mq
MQ to talk to client.
const void * data
Actual data of the element.
uint8_t symmetric
Also return set elements we are sending to the remote peer.
Definition: setu.h:221
uint16_t reserved
For alignment, always zero.
Definition: setu.h:291
uint32_t accept_reject_id
ID of the incoming request we want to reject.
Definition: setu.h:142
static void send_client_element(struct Operation *op, const struct GNUNET_SETU_Element *element, enum GNUNET_SETU_Status status)
Send a result message to the client indicating that there is a new element.
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run on shutdown, that is when a CTRL-C signal is received, or when GNUNET_SCHEDULER_shutdown() is being invoked.
Definition: scheduler.c:1331
struct GNUNET_HashCode element_hash
Hash of the element.
unsigned int refcount
Number of references to the content.
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
int GNUNET_CONTAINER_multihashmap32_iterate(struct GNUNET_CONTAINER_MultiHashMap32 *map, GNUNET_CONTAINER_MulitHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map.
struct StrataEstimator * strata_estimator_create(unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum)
Create a new strata estimator with the given parameters.
Message containing buckets of an invertible bloom filter.
struct Listener * next
Listeners are held in a doubly linked list.
static void handle_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
#define GNUNET_MQ_extract_nested_mh(var)
Return a pointer to the message at the end of the given message.
struct InvertibleBloomFilter * remote_ibf
The IBF we currently receive.
uint32_t GNUNET_CRYPTO_random_u32(enum GNUNET_CRYPTO_Quality mode, uint32_t i)
Produce a random value.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
static struct GNUNET_CADET_Handle * mh
Cadet handle.
Definition: gnunet-cadet.c:92
The other peer is decoding the IBF we just sent.
static struct Listener * listener_tail
Listeners are held in a doubly linked list.
uint32_t request_id
ID of the request we want to cancel.
Definition: setu.h:310
Invertible bloom filter (IBF).
Definition: ibf.h:82
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE(C)
struct Operation * op_head
Head of DLL of operations this listener is responsible for.
unsigned ibf_bucket_number
Number of buckets in IBF.
In the ultimate phase, we wait until our demands are satisfied and then quit (sending another DONE me...
UnionOperationPhase
Current phase we are in for a union operation.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SE
Strata estimator.
static void handle_union_p2p_request_full(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a request for full set transmission.
static int init_key_to_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for initializing the key-to-element mapping of a union operation.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
static int ret
Return value of the commandline.
Definition: gnunet-abd.c:81
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:67
uint32_t accept_reject_id
ID of the incoming request we want to accept.
Definition: setu.h:88
static int check_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Check an IBF message from a remote peer.
static void send_offers_for_key(struct Operation *op, struct IBF_Key ibf_key)
Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
static void unsalt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
FIXME.
struct GNUNET_CONTAINER_MultiHashMap * elements
Maps struct GNUNET_HashCode * to struct ElementEntry *.
static void handle_client_cancel(void *cls, const struct GNUNET_SETU_CancelMessage *msg)
Handle a request from the client to cancel a running set operation.
struct KeyEntry * k
FIXME.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER
Tell the other peer which hashes match a given IBF key.
uint64_t initial_size
Initial size of our set, just before the operation started.
Opaque handle to a channel.
Definition: cadet.h:116
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
static struct GNUNET_CADET_Handle * cadet
Handle to the cadet service, used to listen for and connect to remote peers.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
uint64_t key_val
Definition: ibf.h:47
int GNUNET_CONTAINER_multihashmap32_get_multiple(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, GNUNET_CONTAINER_MulitHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map that match a particular key.
unsigned int ibf_buckets_received
Number of ibf buckets already received into the remote_ibf.
A listener is inhabited by a client, and waits for evaluation requests from remote peers...
#define GNUNET_MESSAGE_TYPE_SETU_P2P_DONE
Set operation is done.
uint32_t request_id
id the result belongs to
Definition: setu.h:255
void ibf_write_slice(const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf)
Write buckets from an ibf to a buffer.
Definition: ibf.c:290
uint32_t salt_receive
Salt for the IBF we&#39;ve received and that we&#39;re currently decoding.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
int GNUNET_CONTAINER_multihashmap_contains(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Check if the map contains any value under the given key (including values that are NULL)...
static void handle_union_p2p_over(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a over message from a remote peer.
Added Roundtripscounter.
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
uint32_t request_id
Request ID to identify responses.
Definition: setu.h:93
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
uint8_t byzantine
GNUNET_YES to fail operations where Byzantine faults are suspected
Definition: setu.h:111
void GNUNET_SCHEDULER_shutdown(void)
Request the shutdown of a scheduler.
Definition: scheduler.c:531
Handle for the service.
static void _GSS_operation_destroy2(struct Operation *op)
This function probably should not exist and be replaced by inlining more specific logic in the variou...
static unsigned int force_full
static void * channel_new_cb(void *cls, struct GNUNET_CADET_Channel *channel, const struct GNUNET_PeerIdentity *source)
Method called whenever another peer has added us to a channel the other peer initiated.
static int send_ibf(struct Operation *op, uint16_t ibf_order)
Send an ibf of appropriate size.
int GNUNET_CONTAINER_multihashmap32_put(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
uint16_t element_type
Type of the element attached to the message, if any.
Definition: setu.h:266
Message sent by the client to the service to ask starting a new set to perform operations with...
Definition: setu.h:40
struct perf_num_send_resived_msg element_full
Internal representation of the hash map.
#define SE_IBF_SIZE
Size of the IBFs in the strata estimator.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur...
struct perf_num_send_resived_msg request_full
#define INCOMING_CHANNEL_TIMEOUT
How long do we hold on to an incoming channel if there is no local listener before giving up...
unsigned int generation_created
Generation in which the operation handle was created.
static unsigned int num_clients
Number of active clients.
struct GNUNET_SCHEDULER_Task * timeout_task
Timeout task, if the incoming peer has not been accepted after the timeout, it will be disconnected...
static int send_full_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Send a set element.
static struct GNUNET_DNSSTUB_Context * ctx
Context for DNS resolution.
static pa_context * context
Pulseaudio context.
const void * data
Actual data of the element.
struct GNUNET_CADET_Handle * GNUNET_CADET_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the MQ-based cadet service.
Definition: cadet_api.c:910
The protocol is almost finished, but we still have to flush our message queue and/or expect some elem...
A handle to a strata estimator.
struct StrataEstimator * strata_estimator_dup(struct StrataEstimator *se)
Make a copy of a strata estimator.
static void handle_union_p2p_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a done message from a remote peer.
struct Operation * next
Kept in a DLL of the listener, if listener is non-NULL.
struct perf_num_send_resived_msg se
static void _GSS_operation_destroy(struct Operation *op)
Destroy the given operation.
uint8_t force_full
Always send full sets, even if delta operations would be more efficient.
Definition: setu.h:210
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
#define _(String)
GNU gettext support macro.
Definition: platform.h:178
The key entry is used to associate an ibf key with an element.
Handle to a client that is connected to a service.
Definition: service.c:250
int GNUNET_snprintf(char *buf, size_t size, const char *format,...) __attribute__((format(printf
Like snprintf, just aborts if the buffer is of insufficient size.
static void send_client_done(void *cls)
Signal to the client that the operation has finished and destroy the operation.
struct GNUNET_PeerIdentity peer
The identity of the requesting peer.
static struct IBF_Key get_ibf_key(const struct GNUNET_HashCode *src)
Derive the IBF key from a hash code and a salt.
uint8_t symmetric
GNUNET_YES to also send back set elements we are sending to the remote peer.
Definition: setu.h:117
struct InvertibleBloomFilter * ibf_dup(const struct InvertibleBloomFilter *ibf)
Create a copy of an IBF, the copy has to be destroyed properly.
Definition: ibf.c:379
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEC
Compressed strata estimator.
uint16_t element_type
Type of the element to add or remove.
Definition: setu.h:286
uint32_t size
How many cells does this IBF have?
Definition: ibf.h:87
static void handle_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Handle an IBF message from a remote peer.
static int check_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
Message sent by client to service to initiate a set operation as a client (not as listener)...
Definition: setu.h:177
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct...
Definition: gnunet_mq_lib.h:52
static void fail_union_operation(struct Operation *op)
Inform the client that the union operation has failed, and proceed to destroy the evaluate operation...
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1269
The other peer refused to do the operation with us, or something went wrong.
static struct KeyEntry * op_get_element(struct Operation *op, const struct GNUNET_HashCode *element_hash)
Determine whether the given element is already in the operation&#39;s element set.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
static int check_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
GNUNET_SETU_Status
Status for the result callback.
struct StrataEstimator * se
The strata estimator is only generated once for each set.
struct GNUNET_CADET_Port * GNUNET_CADET_open_port(struct GNUNET_CADET_Handle *h, const struct GNUNET_HashCode *port, GNUNET_CADET_ConnectEventHandler connects, void *connects_cls, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Open a port to receive incoming MQ-based channels.
Definition: cadet_api.c:970
#define GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL
Demand the whole element from the other peer, given only the hash code.
, &#39; bother checking if a value already exists (faster than GNUNET_CONTAINER_MULTIHASHMAPOPTION_...
static int destroy_elements_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator over hash map entries to free element entries.
static char * value
Value of the record to add/remove.
#define MAX_BUCKETS_PER_MESSAGE
Number of buckets that can be transmitted in one message.
struct GNUNET_CONTAINER_MultiHashMap * demanded_hashes
Hashes for elements that we have demanded from the other peer.
Information about an element element in the set.
#define GNUNET_MESSAGE_TYPE_SETU_ACCEPT
Accept an incoming set request.
#define GNUNET_MESSAGE_TYPE_SETU_RESULT
Handle result message from operation.
#define GNUNET_MESSAGE_TYPE_SETU_CREATE
Create a new local set.
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
static void advance_generation(struct Set *set)
Advance the current generation of a set, adding exclusion ranges if necessary.
Success, all elements have been sent (and received).
static void handle_union_p2p_full_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a "full done" message.
void strata_estimator_destroy(struct StrataEstimator *se)
Destroy a strata estimator, free all of its resources.
int remote
GNUNET_YES if the element is a remote element, and does not belong to the operation&#39;s set...
uint16_t element_type
Application-specific element type.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
int ibf_decode(struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *ret_id)
Decode and remove an element from the IBF, if possible.
Definition: ibf.c:228
uint8_t order
Order of the whole ibf, where num_buckets = 2^order.
struct IBF_Key ibf_key
IBF key for the entry, derived from the current salt.
Element should be added to the result set of the local peer, i.e.
struct perf_num_send_resived_msg operation_request
We sent the strata estimator, and expect an IBF.
A set that supports a specific operation with other peers.
uint32_t accept_id
ID of the to identify the request when accepting or rejecting it.
Definition: setu.h:160
#define IBF_BUCKET_SIZE
Size of one ibf bucket in bytes.
Definition: ibf.h:72
static void handle_client_accept(void *cls, const struct GNUNET_SETU_AcceptMessage *msg)
Handle a request from the client to accept a set operation that came from a remote peer...
struct ElementEntry * element
The actual element associated with the key.
struct perf_num_send_resived_msg over
Randomness for IVs etc.
struct perf_num_send_resived_msg demand
unsigned int GNUNET_CONTAINER_multihashmap32_size(const struct GNUNET_CONTAINER_MultiHashMap32 *map)
Get the number of key-value pairs in the map.
enum GNUNET_GenericReturnValue GNUNET_CRYPTO_kdf(void *result, size_t out_len, const void *xts, size_t xts_len, const void *skm, size_t skm_len,...)
Derive key.
Definition: crypto_kdf.c:90
uint16_t status
See PRISM_STATUS_*-constants.
#define GNUNET_MESSAGE_TYPE_SETU_CANCEL
Cancel a set operation.
static char buf[2048]
static void salt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
Modify an IBF key k_in based on the salt, returning a salted key in k_out.
static void channel_window_cb(void *cls, const struct GNUNET_CADET_Channel *channel, int window_size)
Function called whenever an MQ-channel&#39;s transmission window size changes.
Continuation for multi part IBFs.
int GNUNET_CONTAINER_multihashmap_remove(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, const void *value)
Remove the given key-value pair from the map.
struct Set * set
Set associated with the operation, NULL until the spec has been associated with a set...
In the penultimate phase, we wait until all our demands are satisfied.
uint32_t byzantine_lower_bound
Lower bound for the set size, used only when byzantine mode is enabled.
Definition: setu.h:227
void ibf_subtract(struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2)
Subtract ibf2 from ibf1, storing the result in ibf1.
Definition: ibf.c:356
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF.
unsigned int strata_count
Size of the IBF array in strata.
#define GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE
Maximum size of a context message for set operation requests.
void strata_estimator_insert(struct StrataEstimator *se, struct IBF_Key key)
Add a key to the strata estimator.
size_t strata_estimator_write(const struct StrataEstimator *se, void *buf)
Write the given strata estimator to the buffer.
Internal representation of the hash map.
static struct Operation * get_incoming(uint32_t id)
Get the incoming socket associated with the given id.
struct perf_num_send_resived_msg done
#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE
Request all missing elements from the other peer, based on their sets and the elements we previously ...
A 512-bit hashcode.
uint16_t result_status
Was the evaluation successful? Contains an enum GNUNET_SETU_Status in NBO.
Definition: setu.h:261
Message sent by a listening client to the service to accept performing the operation with the other p...
Definition: setu.h:78
void GNUNET_SERVICE_client_drop(struct GNUNET_SERVICE_Client *c)
Ask the server to disconnect from the given client.
Definition: service.c:2325
uint8_t force_delta
Always use delta operation instead of sending full sets, even it it&#39;s less efficient.
Definition: setu.h:204
static int destroy_key_to_element_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries...
void GNUNET_SETU_element_hash(const struct GNUNET_SETU_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition: setu_api.c:876
Message handler for a specific message type.
static int res
int symmetric
GNUNET_YES to also send back set elements we are sending to the remote peer.
uint8_t byzantine
GNUNET_YES to fail operations where Byzantine faults are suspected
Definition: setu.h:216
#define LOG(kind,...)
struct GNUNET_CONTAINER_MultiHashMap32 * GNUNET_CONTAINER_multihashmap32_create(unsigned int len)
Create a 32-bit key multi hash map.
void GNUNET_CONTAINER_multihashmap32_destroy(struct GNUNET_CONTAINER_MultiHashMap32 *map)
Destroy a 32-bit key hash map.
struct GNUNET_CONTAINER_MultiHashMap32 * key_to_element
Maps unsalted IBF-Keys to elements.
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *mqm)
Discard the message queue message, free all allocated resources.
Definition: mq.c:323
static GstElement * source
Appsrc instance into which we write data for the pipeline.
static void handle_client_create_set(void *cls, const struct GNUNET_SETU_CreateMessage *msg)
Called when a client wants to create a new set.
int force_delta
Always use delta operation instead of sending full sets, even it it&#39;s less efficient.
static int op_get_element_iterator(void *cls, uint32_t key, void *value)
Iterator over the mapping from IBF keys to element entries.
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:36
void ibf_read_slice(const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf)
Read buckets from a buffer into an ibf.
Definition: ibf.c:323
There must only be one value per key; storing a value should fail if a value under the same key alrea...
static void handle_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
struct GNUNET_TESTBED_Peer * peer
The peer associated with this model.
struct GNUNET_HashCode key
The key used in the DHT.
int strata_estimator_read(const void *buf, size_t buf_len, int is_compressed, struct StrataEstimator *se)
Read strata from the buffer into the given strata estimator.
static unsigned int size
Size of the "table".
Definition: peer.c:67
#define GNUNET_MESSAGE_TYPE_SETU_ADD
Add element to set.
struct GNUNET_CADET_Port * open_port
The port we are listening on with CADET.
uint64_t current_size
Current set size.
Definition: setu.h:250
#define GNUNET_MQ_msg_header_extra(mh, esize, type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header and extra space...
Definition: gnunet_mq_lib.h:88
int byzantine
GNUNET_YES to fail operations where Byzantine faults are suspected
void ibf_destroy(struct InvertibleBloomFilter *ibf)
Destroy all resources associated with the invertible bloom filter.
Definition: ibf.c:403
struct ClientState * cs
Client that owns the set.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT
Send a set element, not as response to a demand but because we&#39;re sending the full set...
char * getenv()
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
static int decode_and_send(struct Operation *op)
Decode which elements are missing on each side, and send the appropriate offers and inquiries...
Strata estimator together with the peer&#39;s overall set size.
struct InvertibleBloomFilter * ibf_create(uint32_t size, uint8_t hash_num)
Create an invertible bloom filter.
Definition: ibf.c:79
#define IBF_ALPHA
Number of buckets used in the ibf per estimated difference.
static struct GNUNET_SCHEDULER_Task * timeout_task
Task to be run on timeout.
Definition: gnunet-arm.c:124
Message sent by client to the service to add an element to the set.
Definition: setu.h:276
unsigned int strata_estimator_difference(const struct StrataEstimator *se1, const struct StrataEstimator *se2)
Estimate set difference with two strata estimators, i.e.
static int check_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
int client_done_sent
Did we send the client that we are done?
struct GNUNET_PeerIdentity target_peer
Peer to evaluate the operation with.
Definition: setu.h:193
Operation context used to execute a set operation.
static int sum_sent_received_bytes(int size, struct perf_num_send_resived_msg perf_rtt_struct)
int GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
static int send_offers_iterator(void *cls, uint32_t key, void *value)
Iterator to send elements to a remote peer.
A request for an operation with another client.
Definition: setu.h:149
SetContent stores the actual set elements, which may be shared by multiple generations derived from o...
struct perf_num_send_resived_msg offer
static void handle_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
static void op_register_element(struct Operation *op, struct ElementEntry *ee, int received)
Insert an element into the union operation&#39;s key-to-element mapping.
static void handle_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Handle offers (of struct GNUNET_HashCodes) and respond with demands (of struct GNUNET_HashCodes).
static void incoming_timeout_cb(void *cls)
Timeout happens iff:
static void incoming_destroy(struct Operation *op)
Destroy an incoming request from a remote peer.
static uint32_t suggest_id
Counter for allocating unique IDs for clients, used to identify incoming operation requests from remo...
static float calculate_perf_rtt()
uint16_t reserved2
Padding, must be 0.
uint32_t request_id
Id of our set to evaluate, chosen implicitly by the client when it calls GNUNET_SETU_commit().
Definition: setu.h:188
static int check_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
Allow multiple values with the same key.
unsigned int generation
First generation that includes this element.
Handle to a message queue.
Definition: mq.c:85
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF.
static struct GNUNET_CRYPTO_PowSalt salt
Salt for PoW calcualations.
void ibf_insert(struct InvertibleBloomFilter *ibf, struct IBF_Key key)
Insert a key into an IBF.
Definition: ibf.c:167
struct perf_num_send_resived_msg inquery
#define GNUNET_MQ_msg_header(type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
Definition: gnunet_mq_lib.h:76
The identity of the host (wraps the signing key of the peer).
static struct GNUNET_STATISTICS_Handle * _GSS_statistics
Statistics handle.
static void initialize_key_to_element(struct Operation *op)
Initialize the IBF key to element mapping local to this set operation.
struct GNUNET_PeerIdentity peer_id
Identity of the requesting peer.
Definition: setu.h:165
void GNUNET_CADET_receive_done(struct GNUNET_CADET_Channel *channel)
Send an ack on the channel to confirm the processing of a message.
Definition: cadet_api.c:888
static int send_missing_full_elements_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries...
uint8_t force_full
Always send full sets, even if delta operations would be more efficient.
Definition: setu.h:105
struct GNUNET_CADET_Channel * GNUNET_CADET_channel_create(struct GNUNET_CADET_Handle *h, void *channel_cls, const struct GNUNET_PeerIdentity *destination, const struct GNUNET_HashCode *port, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Create a new channel towards a remote peer.
Definition: cadet_api.c:1031
struct GNUNET_CADET_Channel * channel
Channel to the peer.
We are decoding an IBF.
uint32_t salt
Salt used when hashing elements for this IBF.
struct Operation * ops_head
Evaluate operations are held in a linked list.
Message sent by the service to the client to indicate an element that is removed (set intersection) o...
Definition: setu.h:240
configuration data
Definition: configuration.c:84
uint32_t salt
Salt to use for the operation.
struct GNUNET_SERVICE_Client * client
Client this is about.
const char * name
uint32_t client_request_id
ID used to identify an operation between service and client.
int GNUNET_CRYPTO_hash_cmp(const struct GNUNET_HashCode *h1, const struct GNUNET_HashCode *h2)
Compare function for HashCodes, producing a total ordering of all hashcodes.
Definition: crypto_hash.c:201
struct InvertibleBloomFilter * local_ibf
The IBF with the local set&#39;s element.
struct SetContent * content
Content, possibly shared by multiple sets, and thus reference counted.
struct perf_num_send_resived_msg full_done
#define GNUNET_MESSAGE_TYPE_SETU_P2P_IBF
Invertible bloom filter.
Sent to the service by the client in order to cancel a set operation.
Definition: setu.h:300
static int in_shutdown
Are we in shutdown? if GNUNET_YES and the number of clients drops to zero, disconnect from CADET...
uint16_t size
Number of bytes in the buffer pointed to by data.
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
static void handle_client_reject(void *cls, const struct GNUNET_SETU_RejectMessage *msg)
Called when the listening client rejects an operation request by another peer.
#define GNUNET_log(kind,...)
static int check_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Check offer (of struct GNUNET_HashCodes).
Entry in list of pending tasks.
Definition: scheduler.c:134
static void handle_client_listen(void *cls, const struct GNUNET_SETU_ListenMessage *msg)
Called when a client wants to create a new listener.
#define MAX_IBF_ORDER
The maximum size of an ibf we use is 2^(MAX_IBF_ORDER).
#define SE_STRATA_COUNT
Number of IBFs in a strata estimator.
static struct Listener * listener_head
Listeners are held in a doubly linked list.
unsigned int ibf_number_buckets_per_element
Number of Element per bucket in IBF.
struct GNUNET_SET_Element element
The actual element.
Opaque handle to a port.
Definition: cadet_api.c:79
Phase that receives full set first and then sends elements that are the local peer missing...
struct Listener * prev
Listeners are held in a doubly linked list.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
static int check_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Check a request for a set operation from another peer.
#define GNUNET_MESSAGE_TYPE_SETU_LISTEN
Listen for operation requests.
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
struct GNUNET_HashCode app_id
Application ID for the operation, used to distinguish multiple operations of the same type with the s...
Header for all communications.
void GNUNET_CADET_close_port(struct GNUNET_CADET_Port *p)
Close a port opened with GNUNET_CADET_open_port().
Definition: cadet_api.c:808
static void shutdown_task(void *cls)
Called to clean up, after a shutdown has been requested.
static void handle_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
uint8_t force_delta
Always use delta operation instead of sending full sets, even it it&#39;s less efficient.
Definition: setu.h:99
static unsigned int get_order_from_difference(unsigned int diff)
Compute the necessary order of an ibf from the size of the symmetric set difference.
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:355
#define GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SETU_REJECT
Reject a set request.
static int prepare_ibf(struct Operation *op, uint32_t size)
Create an ibf with the operation&#39;s elements of the specified size.
double rtt_bandwidth_tradeoff
User defined Bandwidth Round Trips Tradeoff.
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SETU_ADD.
Definition: setu.h:281
#define GNUNET_MESSAGE_TYPE_SETU_EVALUATE
Evaluate a set operation.
struct ClientState * cs
Client that owns the listener.
struct Listener * listener
Listener, if associated with the client, otherwise NULL.
uint32_t remote_element_count
Remote peers element count.
struct Operation * op_tail
Tail of DLL of operations this listener is responsible for.
static void * client_connect_cb(void *cls, struct GNUNET_SERVICE_Client *c, struct GNUNET_MQ_Handle *mq)
Callback called when a client connects to the service.
static struct GNUNET_ARM_Operation * op
Current operation.
Definition: gnunet-arm.c:144
static void channel_end_cb(void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
Function called whenever a channel is destroyed.
Keys that can be inserted into and removed from an IBF.
Definition: ibf.h:45
uint32_t salt
Salt used when hashing elements for this inquiry.
uint32_t byzantine_lower_bound
Lower bound for the set size, used only when byzantine mode is enabled.
Definition: setu.h:123
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MulitHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
static struct GNUNET_IDENTITY_EgoLookup * el
EgoLookup.
Definition: gnunet-abd.c:51
struct Set * next
Sets are held in a doubly linked list (in sets_head and sets_tail).
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER
Request all missing elements from the other peer, based on their sets and the elements we previously ...
uint32_t salt_send
Salt that we&#39;re using for sending IBFs.
void GNUNET_CADET_channel_destroy(struct GNUNET_CADET_Channel *channel)
Destroy an existing channel.
Definition: cadet_api.c:837
static void run(void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_SERVICE_Handle *service)
Function called by the service&#39;s run method to run service-specific setup code.
We sent the request message, and expect a strata estimator.
struct Listener * listener
Port this operation runs on.
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
void GNUNET_SERVICE_client_continue(struct GNUNET_SERVICE_Client *c)
Continue receiving further messages from the given client.
Definition: service.c:2244
uint16_t size
Number of bytes in the buffer pointed to by data.
struct GNUNET_MQ_Handle * mq
Message queue for the channel.
static int check_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check an element message from a remote peer.
static void handle_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
Used as a closure for sending elements with a specific IBF key.
struct GNUNET_HashCode hash
FIXME.
Message sent by a listening client to the service to reject performing the operation with the other p...
Definition: setu.h:132
struct IBF_Key ibf_key
The IBF key whose matching elements should be sent.
static void handle_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
#define GNUNET_malloc(size)
Wrapper around malloc.
static int _GSS_is_element_of_operation(struct ElementEntry *ee, struct Operation *op)
Is element ee part of the set used by op?
#define GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY
Tell the other peer to send us a list of hashes that match an IBF key.
uint32_t received_total
Total number of elements received from the other peer.
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:53
static unsigned int force_delta
#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS
Actual set elements.
#define GNUNET_free(ptr)
Wrapper around free.
uint16_t element_type
Application-specific element type.
static void handle_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Handle a request for a set operation from another peer.
struct GNUNET_MessageHeader * GNUNET_copy_message(const struct GNUNET_MessageHeader *msg)
Create a copy of the given message.
uint16_t len
length of data (which is always a uint32_t, but presumably this can be used to specify that fewer byt...
struct perf_rtt_struct perf_rtt
struct GNUNET_MQ_Handle * GNUNET_CADET_get_mq(const struct GNUNET_CADET_Channel *channel)
Obtain the message queue for a connected peer.
Definition: cadet_api.c:1082
int force_full
Always send full sets, even if delta operations would be more efficient.
struct GNUNET_HashCode app_id
application id
Definition: setu.h:70
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:972
Element stored in a set.
static void handle_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
unsigned int ibf_size
Size of each IBF stratum (in bytes)