GNUnet  0.19.5
gnunet-service-setu.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2013-2017, 2020 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
30 #include "ibf.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_applications.h"
33 #include "gnunet_cadet_service.h"
37 #include <gcrypt.h>
38 #include "gnunet_setu_service.h"
39 #include "setu.h"
40 
41 #define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
42 
47 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
48 
52 #define SE_STRATA_COUNT 32
53 
54 
59 #define SE_IBFS_TOTAL_SIZE 632
60 
64 #define SE_IBF_HASH_NUM 3
65 
69 #define MAX_BUCKETS_PER_MESSAGE ((1 << 16) / IBF_BUCKET_SIZE)
70 
76 #define MAX_IBF_SIZE 1048576
77 
78 
83 #define IBF_MIN_SIZE 37
84 
89 #define DIFFERENTIAL_RTT_MEAN 3.65145
90 
95 #define SECURITY_LEVEL 80
96 
102 #define PROBABILITY_FOR_NEW_ROUND 0.15
103 
108 #define MEASURE_PERFORMANCE 0
109 
110 
115 {
120 
131 
136 
141 
146 
152 
158 
164 
170 
176 };
177 
183 {
188 
193 
198 };
199 
200 
207 struct ElementEntry
208 {
214 
220 
224  unsigned int generation;
225 
230  int remote;
231 };
232 
233 
238 struct Listener;
239 
240 
244 struct Set;
245 
246 
250 struct ClientState
251 {
255  struct Set *set;
256 
260  struct Listener *listener;
261 
266 
270  struct GNUNET_MQ_Handle *mq;
271 };
272 
273 
277 struct Operation
278 {
279 
284  struct GNUNET_PeerIdentity peer;
285 
289  uint64_t initial_size;
290 
294  struct Operation *next;
295 
299  struct Operation *prev;
300 
305 
309  struct Listener *listener;
310 
314  struct GNUNET_MQ_Handle *mq;
315 
320 
325  struct Set *set;
326 
332 
337 
342 
349 
355 
360 
365 
369  int client_done_sent;
370 
375 
379  uint32_t salt_send;
380 
384  uint32_t salt_receive;
385 
390  uint32_t received_fresh;
391 
395  uint32_t received_total;
396 
400  uint32_t salt;
401 
405  uint32_t remote_element_count;
406 
410  uint32_t client_request_id;
411 
416  int force_delta;
417 
422  int force_full;
423 
428  int byzantine;
429 
435 
441 
447  uint32_t suggest_id;
448 
453  unsigned int generation_created;
454 
455 
460 
461 
466 
467 
473 
479  uint8_t peer_site;
480 
481 
486 
491 
496 
501 
502 
507 
512 
517 
521  uint64_t remote_set_diff;
522 
526  uint64_t local_set_diff;
527 
532 };
533 
534 
539 struct SetContent
540 {
545 
550 
555 
559  unsigned int refcount;
560 
564  unsigned int latest_generation;
565 
569  int iterator_count;
570 };
571 
572 
576 struct Set
577 {
581  struct Set *next;
582 
586  struct Set *prev;
587 
592  struct ClientState *cs;
593 
598  struct SetContent *content;
599 
605 
609  struct Operation *ops_head;
610 
614  struct Operation *ops_tail;
615 
620  unsigned int current_generation;
621 
622 };
623 
624 
628 struct KeyEntry
629 {
633  struct IBF_Key ibf_key;
634 
641  struct ElementEntry *element;
642 
648  int received;
649 };
650 
651 
656 struct SendElementClosure
657 {
662  struct IBF_Key ibf_key;
663 
668  struct Operation *op;
669 };
670 
671 
676 struct Listener
677 {
681  struct Listener *next;
682 
686  struct Listener *prev;
687 
693  struct Operation *op_head;
694 
700  struct Operation *op_tail;
701 
706  struct ClientState *cs;
707 
712 
717  struct GNUNET_HashCode app_id;
718 
719 };
720 
721 
726 static struct GNUNET_CADET_Handle *cadet;
727 
732 
736 static struct Listener *listener_head;
737 
741 static struct Listener *listener_tail;
742 
746 static unsigned int num_clients;
747 
752 static int in_shutdown;
753 
759 static uint32_t suggest_id;
760 
761 #if MEASURE_PERFORMANCE
766 static const struct GNUNET_CONFIGURATION_Handle *setu_cfg;
767 
768 
774 struct perf_num_send_received_msg
775 {
776  uint64_t sent;
777  uint64_t sent_var_bytes;
778  uint64_t received;
779  uint64_t received_var_bytes;
780 };
781 
785 struct per_store_struct
786 {
787  struct perf_num_send_received_msg operation_request;
788  struct perf_num_send_received_msg se;
789  struct perf_num_send_received_msg request_full;
790  struct perf_num_send_received_msg element_full;
791  struct perf_num_send_received_msg full_done;
792  struct perf_num_send_received_msg ibf;
793  struct perf_num_send_received_msg inquery;
794  struct perf_num_send_received_msg element;
795  struct perf_num_send_received_msg demand;
796  struct perf_num_send_received_msg offer;
797  struct perf_num_send_received_msg done;
798  struct perf_num_send_received_msg over;
799  uint64_t se_diff;
800  uint64_t se_diff_remote;
801  uint64_t se_diff_local;
802  uint64_t active_passive_switches;
803  uint8_t mode_of_operation;
804 };
805 
806 struct per_store_struct perf_store;
807 #endif
808 
814 {
819 
824 
829 
834 };
835 
841 {
846 
851 
856 };
857 
858 
863 {
876 };
877 
878 
879 #if MEASURE_PERFORMANCE
880 
886 static void
887 load_config (struct Operation *op)
888 {
889  long long number;
890  float fl;
891 
892  setu_cfg = GNUNET_CONFIGURATION_create ();
893  GNUNET_CONFIGURATION_load (setu_cfg,
894  "perf_setu.conf");
896  "IBF",
897  "BUCKET_NUMBER_FACTOR",
898  &fl);
899  op->ibf_bucket_number_factor = fl;
901  "IBF",
902  "NUMBER_PER_BUCKET",
903  &number);
904  op->ibf_number_buckets_per_element = number;
906  "PERFORMANCE",
907  "TRADEOFF",
908  &number);
909  op->rtt_bandwidth_tradeoff = number;
911  "BOUNDARIES",
912  "UPPER_ELEMENT",
913  &number);
914  op->byzantine_upper_bound = number;
915  op->peer_site = 0;
916 }
917 
918 
925 static int
926 sum_sent_received_bytes (uint64_t size,
927  struct perf_num_send_received_msg
928  perf_num_send_received_msg)
929 {
930  return (size * perf_num_send_received_msg.sent)
931  + (size * perf_num_send_received_msg.received)
932  + perf_num_send_received_msg.sent_var_bytes
933  + perf_num_send_received_msg.received_var_bytes;
934 }
935 
936 
940 static void
941 calculate_perf_store ()
942 {
943 
947  float rtt = 1;
948  int bytes_transmitted = 0;
949 
953  if ((perf_store.element_full.received != 0) ||
954  (perf_store.element_full.sent != 0)
955  )
956  rtt += 1;
957 
958  if ((perf_store.request_full.received != 0) ||
959  (perf_store.request_full.sent != 0)
960  )
961  rtt += 0.5;
962 
967  if ((perf_store.element.received != 0) ||
968  (perf_store.element.sent != 0))
969  {
970  int iterations = perf_store.active_passive_switches;
971 
972  if (iterations > 0)
973  rtt += iterations * 0.5;
974  rtt += 2.5;
975  }
976 
977 
981  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
983  perf_store.request_full);
984 
985  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
987  perf_store.element_full);
988  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
990  perf_store.element);
991  // bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_store.operation_request);
992  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
994  perf_store.se);
995  bytes_transmitted += sum_sent_received_bytes (4, perf_store.full_done);
996  bytes_transmitted += sum_sent_received_bytes (sizeof(struct IBFMessage),
997  perf_store.ibf);
998  bytes_transmitted += sum_sent_received_bytes (sizeof(struct InquiryMessage),
999  perf_store.inquery);
1000  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1002  perf_store.demand);
1003  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1005  perf_store.offer);
1006  bytes_transmitted += sum_sent_received_bytes (4, perf_store.done);
1007 
1011  float factor;
1012  GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR",
1013  &factor);
1014  long long num_per_bucket;
1015  GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET",
1016  &num_per_bucket);
1017 
1018 
1019  int decoded = 0;
1020  if (perf_store.active_passive_switches == 0)
1021  decoded = 1;
1022  int ibf_bytes_transmitted = sum_sent_received_bytes (sizeof(struct
1023  IBFMessage),
1024  perf_store.ibf);
1025 
1026  FILE *out1 = fopen ("perf_data.csv", "a");
1027  fprintf (out1, "%d,%f,%d,%d,%f,%d,%d,%d,%d,%d\n",num_per_bucket,factor,
1028  decoded,ibf_bytes_transmitted,rtt,perf_store.se_diff,
1029  bytes_transmitted,
1030  perf_store.se_diff_local,perf_store.se_diff_remote,
1031  perf_store.mode_of_operation);
1032  fclose (out1);
1033 
1034 }
1035 
1036 
1037 #endif
1050 static uint8_t
1051 estimate_best_mode_of_operation (uint64_t avg_element_size,
1052  uint64_t local_set_size,
1053  uint64_t remote_set_size,
1054  uint64_t est_set_diff_remote,
1055  uint64_t est_set_diff_local,
1056  uint64_t bandwith_latency_tradeoff,
1057  uint64_t ibf_bucket_number_factor)
1058 {
1059 
1060  /*
1061  * In case of initial sync fall to predefined states
1062  */
1063 
1064  if (0 == local_set_size)
1066  if (0 == remote_set_size)
1068 
1069  /*
1070  * Calculate bytes for full Sync
1071  */
1072 
1073  uint8_t sizeof_full_done_header = 4;
1074  uint8_t sizeof_done_header = 4;
1075  uint8_t rtt_min_full = 2;
1076  uint8_t sizeof_request_full = 4;
1077  uint64_t estimated_total_diff = (est_set_diff_remote + est_set_diff_local);
1078 
1079  /* Estimate byte required if we send first */
1080  uint64_t total_elements_to_send_local_send_first = est_set_diff_remote
1081  + local_set_size;
1082 
1083  uint64_t total_bytes_full_local_send_first = (avg_element_size
1084  *
1085  total_elements_to_send_local_send_first) \
1086  + (
1087  total_elements_to_send_local_send_first * sizeof(struct
1089  + (sizeof_full_done_header * 2) \
1090  + rtt_min_full
1091  * bandwith_latency_tradeoff;
1092 
1093  /* Estimate bytes required if we request from remote peer */
1094  uint64_t total_elements_to_send_remote_send_first = est_set_diff_local
1095  + remote_set_size;
1096 
1097  uint64_t total_bytes_full_remote_send_first = (avg_element_size
1098  *
1099  total_elements_to_send_remote_send_first) \
1100  + (
1101  total_elements_to_send_remote_send_first * sizeof(struct
1103  + (sizeof_full_done_header * 2) \
1104  + (rtt_min_full + 0.5)
1105  * bandwith_latency_tradeoff \
1106  + sizeof_request_full;
1107 
1108  /*
1109  * Calculate bytes for differential Sync
1110  */
1111 
1112  /* Estimate bytes required by IBF transmission*/
1113 
1114  long double ibf_bucket_count = estimated_total_diff
1115  * ibf_bucket_number_factor;
1116 
1117  if (ibf_bucket_count <= IBF_MIN_SIZE)
1118  {
1119  ibf_bucket_count = IBF_MIN_SIZE;
1120  }
1121  uint64_t ibf_message_count = ceil ( ((float) ibf_bucket_count)
1122  / ((float) MAX_BUCKETS_PER_MESSAGE));
1123 
1124  uint64_t estimated_counter_size = ceil (
1125  MIN (2 * log2l (((float) local_set_size)
1126  / ((float) ibf_bucket_count)),
1127  log2l (local_set_size)));
1128 
1129  long double counter_bytes = (float) estimated_counter_size / 8;
1130 
1131  uint64_t ibf_bytes = ceil ((sizeof (struct IBFMessage) * ibf_message_count)
1132  * 1.2 \
1133  + (ibf_bucket_count * sizeof(struct IBF_Key)) * 1.2 \
1134  + (ibf_bucket_count * sizeof(struct IBF_KeyHash))
1135  * 1.2 \
1136  + (ibf_bucket_count * counter_bytes) * 1.2);
1137 
1138  /* Estimate full byte count for differential sync */
1139  uint64_t element_size = (avg_element_size
1140  + sizeof (struct GNUNET_SETU_ElementMessage)) \
1141  * estimated_total_diff;
1142  uint64_t done_size = sizeof_done_header;
1143  uint64_t inquery_size = (sizeof (struct IBF_Key)
1144  + sizeof (struct InquiryMessage))
1145  * estimated_total_diff;
1146  uint64_t demand_size =
1147  (sizeof(struct GNUNET_HashCode) + sizeof(struct GNUNET_MessageHeader))
1148  * estimated_total_diff;
1149  uint64_t offer_size = (sizeof (struct GNUNET_HashCode)
1150  + sizeof (struct GNUNET_MessageHeader))
1151  * estimated_total_diff;
1152 
1153  uint64_t total_bytes_diff = (element_size + done_size + inquery_size
1154  + demand_size + offer_size + ibf_bytes) \
1156  * bandwith_latency_tradeoff);
1157 
1158  uint64_t full_min = MIN (total_bytes_full_local_send_first,
1159  total_bytes_full_remote_send_first);
1160 
1161  /* Decide between full and differential sync */
1162 
1163  if (full_min < total_bytes_diff)
1164  {
1165  /* Decide between sending all element first or receiving all elements */
1166  if (total_bytes_full_remote_send_first > total_bytes_full_local_send_first)
1167  {
1169  }
1170  else
1171  {
1173  }
1174  }
1175  else
1176  {
1177  return DIFFERENTIAL_SYNC;
1178  }
1179 }
1180 
1181 
1190 static enum GNUNET_GenericReturnValue
1191 check_valid_phase (const uint8_t allowed_phases[],
1192  size_t size_phases,
1193  struct Operation *op)
1194 {
1198  for (uint32_t phase_ctr = 0; phase_ctr < size_phases; phase_ctr++)
1199  {
1200  uint8_t phase = allowed_phases[phase_ctr];
1201  if (phase == op->phase)
1202  {
1204  "Message received in valid phase\n");
1205  return GNUNET_YES;
1206  }
1207  }
1209  "Received message in invalid phase: %u\n", op->phase);
1210  return GNUNET_NO;
1211 }
1212 
1213 
1225 static int
1227  enum MESSAGE_CONTROL_FLOW_STATE new_mcfs,
1228  const struct GNUNET_HashCode *hash_code,
1229  enum MESSAGE_TYPE mt)
1230 {
1231  struct messageControlFlowElement *cfe = NULL;
1232  enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1233 
1238  cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1239  if ((ELEMENT_MESSAGE == mt) && (cfe != NULL))
1240  {
1241  if ((new_mcfs != MSG_CFS_SENT) && (MSG_CFS_RECEIVED != cfe->offer))
1242  {
1244  "Received an element without sent offer!\n");
1245  return GNUNET_NO;
1246  }
1247  /* Check that only requested elements are received! */
1248  if ((ELEMENT_MESSAGE == mt) && (new_mcfs != MSG_CFS_SENT) && (cfe->demand !=
1249  MSG_CFS_SENT))
1250  {
1252  "Received an element that was not demanded\n");
1253  return GNUNET_NO;
1254  }
1255  }
1256 
1261  if (NULL == cfe)
1262  {
1263  cfe = GNUNET_new (struct messageControlFlowElement);
1264  if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (hash_map, hash_code,
1265  cfe,
1267  {
1268  GNUNET_free (cfe);
1269  return GNUNET_SYSERR;
1270  }
1271  }
1272 
1277  if (OFFER_MESSAGE == mt)
1278  {
1279  mcfs = &cfe->offer;
1280  }
1281  else if (DEMAND_MESSAGE == mt)
1282  {
1283  mcfs = &cfe->demand;
1284  }
1285  else if (ELEMENT_MESSAGE == mt)
1286  {
1287  mcfs = &cfe->element;
1288  }
1289  else
1290  {
1291  return GNUNET_SYSERR;
1292  }
1293 
1298  if (new_mcfs <= *mcfs)
1299  {
1300  return GNUNET_NO;
1301  }
1302 
1303  *mcfs = new_mcfs;
1304  return GNUNET_YES;
1305 }
1306 
1307 
1315 static int
1318  struct GNUNET_HashCode *hash_code,
1319  enum MESSAGE_TYPE mt)
1320 {
1321  struct messageControlFlowElement *cfe = NULL;
1322  enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1323 
1324  cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1325 
1330  if (cfe != NULL)
1331  {
1332  if (OFFER_MESSAGE == mt)
1333  {
1334  mcfs = &cfe->offer;
1335  }
1336  else if (DEMAND_MESSAGE == mt)
1337  {
1338  mcfs = &cfe->demand;
1339  }
1340  else if (ELEMENT_MESSAGE == mt)
1341  {
1342  mcfs = &cfe->element;
1343  }
1344  else
1345  {
1346  return GNUNET_SYSERR;
1347  }
1348 
1352  if (*mcfs != MSG_CFS_UNINITIALIZED)
1353  {
1354  return GNUNET_YES;
1355  }
1356  }
1357  return GNUNET_NO;
1358 }
1359 
1360 
1371 static int
1373  const struct GNUNET_HashCode *key,
1374  void *value)
1375 {
1376  struct messageControlFlowElement *mcfe = value;
1377 
1378  if (((mcfe->element == MSG_CFS_SENT) || (mcfe->element == MSG_CFS_RECEIVED) ))
1379  {
1380  return GNUNET_YES;
1381  }
1382  return GNUNET_NO;
1383 }
1384 
1385 
1395 static int
1397  const struct GNUNET_HashCode *key,
1398  void *value)
1399 {
1400  struct Operation *op = cls;
1401  struct GNUNET_SETU_Element *element = value;
1402  op->total_elements_size_local += element->size;
1403  return GNUNET_YES;
1404 }
1405 
1406 
1416 static int
1418  const struct GNUNET_HashCode *key,
1419  void *value)
1420 {
1421  struct Operation *op = cls;
1422 
1423  struct GNUNET_HashContext *hashed_key_context =
1425  struct GNUNET_HashCode new_key;
1426 
1430  GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1431  &key,
1432  sizeof(struct IBF_Key));
1433  GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1434  &op->set->content->elements_randomized_salt,
1435  sizeof(uint32_t));
1436  GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
1437  &new_key);
1438  GNUNET_CONTAINER_multihashmap_put (op->set->content->elements_randomized,
1439  &new_key,value,
1441  return GNUNET_YES;
1442 }
1443 
1444 
1455 static int
1457  uint32_t key,
1458  void *value)
1459 {
1460  struct KeyEntry *k = value;
1461 
1462  GNUNET_assert (NULL != k);
1463  if (GNUNET_YES == k->element->remote)
1464  {
1465  GNUNET_free (k->element);
1466  k->element = NULL;
1467  }
1468  GNUNET_free (k);
1469  return GNUNET_YES;
1470 }
1471 
1472 
1479 static void
1480 send_client_done (void *cls)
1481 {
1482  struct Operation *op = cls;
1483  struct GNUNET_MQ_Envelope *ev;
1484  struct GNUNET_SETU_ResultMessage *rm;
1485 
1486  if (GNUNET_YES == op->client_done_sent)
1487  return;
1488  if (PHASE_FINISHED != op->phase)
1489  {
1491  "Union operation failed\n");
1493  "# Union operations failed",
1494  1,
1495  GNUNET_NO);
1498  rm->request_id = htonl (op->client_request_id);
1499  rm->element_type = htons (0);
1500  GNUNET_MQ_send (op->set->cs->mq,
1501  ev);
1502  return;
1503  }
1504 
1505  op->client_done_sent = GNUNET_YES;
1506 
1508  "# Union operations succeeded",
1509  1,
1510  GNUNET_NO);
1512  "Signalling client that union operation is done\n");
1513  ev = GNUNET_MQ_msg (rm,
1515  rm->request_id = htonl (op->client_request_id);
1516  rm->result_status = htons (GNUNET_SETU_STATUS_DONE);
1517  rm->element_type = htons (0);
1519  op->key_to_element));
1520  GNUNET_MQ_send (op->set->cs->mq,
1521  ev);
1522 }
1523 
1524 
1531 static int
1533 {
1534  if (op->byzantine != GNUNET_YES)
1535  return GNUNET_OK;
1536 
1540  if (op->remote_element_count + op->remote_set_diff >
1541  op->byzantine_upper_bound)
1542  return GNUNET_SYSERR;
1543  if (op->local_element_count + op->local_set_diff > op->byzantine_upper_bound)
1544  return GNUNET_SYSERR;
1545 
1549  if (op->remote_element_count < op->byzantine_lower_bound)
1550  return GNUNET_SYSERR;
1551  return GNUNET_OK;
1552 }
1553 
1554 
1555 static enum GNUNET_GenericReturnValue
1556 free_values_iter(void *cls,
1557  const struct GNUNET_HashCode *key,
1558  void *value)
1559 {
1560  GNUNET_free (value);
1561  return GNUNET_YES;
1562 }
1563 
1564 
1565 /* FIXME: the destroy logic is a mess and should be cleaned up! */
1566 
1579 static void
1581 {
1582  struct Set *set = op->set;
1583  struct GNUNET_CADET_Channel *channel;
1584 
1586  "Destroying union operation %p\n",
1587  op);
1588  GNUNET_assert (NULL == op->listener);
1589  /* check if the op was canceled twice */
1590  if (NULL != op->remote_ibf)
1591  {
1592  ibf_destroy (op->remote_ibf);
1593  op->remote_ibf = NULL;
1594  }
1595  if (NULL != op->demanded_hashes)
1596  {
1597  GNUNET_CONTAINER_multihashmap_destroy (op->demanded_hashes);
1598  op->demanded_hashes = NULL;
1599  }
1600  if (NULL != op->local_ibf)
1601  {
1602  ibf_destroy (op->local_ibf);
1603  op->local_ibf = NULL;
1604  }
1605  if (NULL != op->se)
1606  {
1608  op->se = NULL;
1609  }
1610  if (NULL != op->key_to_element)
1611  {
1614  NULL);
1615  GNUNET_CONTAINER_multihashmap32_destroy (op->key_to_element);
1616  op->key_to_element = NULL;
1617  }
1618  if (NULL != op->message_control_flow)
1619  {
1620  GNUNET_CONTAINER_multihashmap_iterate (op->message_control_flow,
1622  NULL);
1623  GNUNET_CONTAINER_multihashmap_destroy (op->message_control_flow);
1624  op->message_control_flow = NULL;
1625  }
1626  if (NULL != op->inquiries_sent)
1627  {
1628  GNUNET_CONTAINER_multihashmap_destroy (op->inquiries_sent);
1629  op->inquiries_sent = NULL;
1630  }
1631  if (NULL != set)
1632  {
1634  set->ops_tail,
1635  op);
1636  op->set = NULL;
1637  }
1638  if (NULL != op->context_msg)
1639  {
1640  GNUNET_free (op->context_msg);
1641  op->context_msg = NULL;
1642  }
1643  if (NULL != (channel = op->channel))
1644  {
1645  /* This will free op; called conditionally as this helper function
1646  is also called from within the channel disconnect handler. */
1647  op->channel = NULL;
1648  GNUNET_CADET_channel_destroy (channel);
1649  }
1650  /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
1651  * there was a channel end handler that will free 'op' on the call stack. */
1652 }
1653 
1654 
1660 static void
1662 
1663 
1669 static void
1671 {
1672  struct Listener *listener;
1673 
1675  "Destroying incoming operation %p\n",
1676  op);
1677  if (NULL != (listener = op->listener))
1678  {
1680  listener->op_tail,
1681  op);
1682  op->listener = NULL;
1683  }
1684  if (NULL != op->timeout_task)
1685  {
1686  GNUNET_SCHEDULER_cancel (op->timeout_task);
1687  op->timeout_task = NULL;
1688  }
1690 }
1691 
1692 
1698 static void
1700 {
1701  struct GNUNET_CADET_Channel *channel;
1702 
1703  if (NULL != (channel = op->channel))
1704  {
1705  /* This will free op; called conditionally as this helper function
1706  is also called from within the channel disconnect handler. */
1707  op->channel = NULL;
1708  GNUNET_CADET_channel_destroy (channel);
1709  }
1710  if (NULL != op->listener)
1711  {
1712  incoming_destroy (op);
1713  return;
1714  }
1715  if (NULL != op->set)
1716  send_client_done (op);
1718  GNUNET_free (op);
1719 }
1720 
1721 
1728 static void
1730 {
1731  struct GNUNET_MQ_Envelope *ev;
1733 
1735  "union operation failed\n");
1737  msg->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
1738  msg->request_id = htonl (op->client_request_id);
1739  msg->element_type = htons (0);
1740  GNUNET_MQ_send (op->set->cs->mq,
1741  ev);
1743 }
1744 
1745 
1756 static void
1758 {
1759  if (GNUNET_YES != op->byzantine)
1760  return;
1761 
1762  int security_level_lb = -1 * SECURITY_LEVEL;
1763  uint64_t duplicates = op->received_fresh - op->received_total;
1764 
1765  /*
1766  * Protect full sync from receiving double element when in FULL SENDING
1767  */
1768  if (PHASE_FULL_SENDING == op->phase)
1769  {
1770  if (duplicates > 0)
1771  {
1773  "PROTOCOL VIOLATION: Received duplicate element in full receiving "
1774  "mode of operation this is not allowed! Duplicates: %llu\n",
1775  (unsigned long long) duplicates);
1776  GNUNET_break_op (0);
1778  return;
1779  }
1780 
1781  }
1782 
1783  /*
1784  * Protect full sync with probabilistic algorithm
1785  */
1786  if (PHASE_FULL_RECEIVING == op->phase)
1787  {
1788  if (0 == op->remote_set_diff)
1789  op->remote_set_diff = 1;
1790 
1791  long double base = (1 - (long double) (op->remote_set_diff
1792  / (long double) (op->initial_size
1793  + op->
1794  remote_set_diff)));
1795  long double exponent = (op->received_total - (op->received_fresh * ((long
1796  double)
1797  op->
1798  initial_size
1799  / (long
1800  double)
1801  op->
1802  remote_set_diff)));
1803  long double value = exponent * (log2l (base) / log2l (2));
1804  if ((value < security_level_lb) || (value > SECURITY_LEVEL) )
1805  {
1807  "PROTOCOL VIOLATION: Other peer violated probabilistic rule for receiving "
1808  "to many duplicated full element : %LF\n",
1809  value);
1810  GNUNET_break_op (0);
1812  return;
1813  }
1814  }
1815 }
1816 
1817 
1822 static void
1824 {
1825  double probability = op->differential_sync_iterations * (log2l (
1827  / log2l (2));
1828  if ((-1 * SECURITY_LEVEL) > probability)
1829  {
1831  "PROTOCOL VIOLATION: Other peer violated probabilistic rule for to many active passive "
1832  "switches in differential sync: %u\n",
1833  op->differential_sync_iterations);
1834  GNUNET_break_op (0);
1836  return;
1837  }
1838 }
1839 
1840 
1848 static struct IBF_Key
1849 get_ibf_key (const struct GNUNET_HashCode *src)
1850 {
1851  struct IBF_Key key;
1852  uint16_t salt = 0;
1853 
1855  GNUNET_CRYPTO_kdf (&key, sizeof(key),
1856  src, sizeof *src,
1857  &salt, sizeof(salt),
1858  NULL, 0));
1859  return key;
1860 }
1861 
1862 
1866 struct GetElementContext
1867 {
1871  struct GNUNET_HashCode hash;
1872 
1876  struct KeyEntry *k;
1877 };
1878 
1879 
1890 static int
1892  uint32_t key,
1893  void *value)
1894 {
1895  struct GetElementContext *ctx = cls;
1896  struct KeyEntry *k = value;
1897 
1898  GNUNET_assert (NULL != k);
1900  &ctx->hash))
1901  {
1902  ctx->k = k;
1903  return GNUNET_NO;
1904  }
1905  return GNUNET_YES;
1906 }
1907 
1908 
1917 static struct KeyEntry *
1919  const struct GNUNET_HashCode *element_hash)
1920 {
1921  int ret;
1922  struct IBF_Key ibf_key;
1923  struct GetElementContext ctx = { { { 0 } }, 0 };
1924 
1925  ctx.hash = *element_hash;
1926 
1927  ibf_key = get_ibf_key (element_hash);
1929  (uint32_t) ibf_key.key_val,
1931  &ctx);
1932 
1933  /* was the iteration aborted because we found the element? */
1934  if (GNUNET_SYSERR == ret)
1935  {
1936  GNUNET_assert (NULL != ctx.k);
1937  return ctx.k;
1938  }
1939  return NULL;
1940 }
1941 
1942 
1957 static void
1959  struct ElementEntry *ee,
1960  int received)
1961 {
1962  struct IBF_Key ibf_key;
1963  struct KeyEntry *k;
1964 
1965  ibf_key = get_ibf_key (&ee->element_hash);
1966  k = GNUNET_new (struct KeyEntry);
1967  k->element = ee;
1968  k->ibf_key = ibf_key;
1969  k->received = received;
1971  GNUNET_CONTAINER_multihashmap32_put (op->key_to_element,
1972  (uint32_t) ibf_key.key_val,
1973  k,
1975 }
1976 
1977 
1982 static void
1983 salt_key (const struct IBF_Key *k_in,
1984  uint32_t salt,
1985  struct IBF_Key *k_out)
1986 {
1987  int s = (salt * 7) % 64;
1988  uint64_t x = k_in->key_val;
1989 
1990  /* rotate ibf key */
1991  x = (x >> s) | (x << (64 - s));
1992  k_out->key_val = x;
1993 }
1994 
1995 
1999 static void
2000 unsalt_key (const struct IBF_Key *k_in,
2001  uint32_t salt,
2002  struct IBF_Key *k_out)
2003 {
2004  int s = (salt * 7) % 64;
2005  uint64_t x = k_in->key_val;
2006 
2007  x = (x << s) | (x >> (64 - s));
2008  k_out->key_val = x;
2009 }
2010 
2011 
2019 static int
2021  uint32_t key,
2022  void *value)
2023 {
2024  struct Operation *op = cls;
2025  struct KeyEntry *ke = value;
2026  struct IBF_Key salted_key;
2027 
2029  "[OP %p] inserting %lx (hash %s) into ibf\n",
2030  op,
2031  (unsigned long) ke->ibf_key.key_val,
2032  GNUNET_h2s (&ke->element->element_hash));
2033  salt_key (&ke->ibf_key,
2034  op->salt_send,
2035  &salted_key);
2036  ibf_insert (op->local_ibf, salted_key);
2037  return GNUNET_YES;
2038 }
2039 
2040 
2048 static int
2050  struct Operation *op)
2051 {
2052  return ee->generation >= op->generation_created;
2053 }
2054 
2055 
2066 static int
2068  const struct GNUNET_HashCode *key,
2069  void *value)
2070 {
2071  struct Operation *op = cls;
2072  struct ElementEntry *ee = value;
2073 
2074  /* make sure that the element belongs to the set at the time
2075  * of creating the operation */
2076  if (GNUNET_NO ==
2078  op))
2079  return GNUNET_YES;
2080  GNUNET_assert (GNUNET_NO == ee->remote);
2082  ee,
2083  GNUNET_NO);
2084  return GNUNET_YES;
2085 }
2086 
2087 
2093 static void
2095 {
2096  unsigned int len;
2097 
2098  GNUNET_assert (NULL == op->key_to_element);
2099  len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
2100  op->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
2101  GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2103  op);
2104 }
2105 
2106 
2115 static int
2117  uint32_t size)
2118 {
2119  GNUNET_assert (NULL != op->key_to_element);
2120 
2121  if (NULL != op->local_ibf)
2122  ibf_destroy (op->local_ibf);
2123  // op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
2124  op->local_ibf = ibf_create (size,
2125  ((uint8_t) op->ibf_number_buckets_per_element));
2126  if (NULL == op->local_ibf)
2127  {
2129  "Failed to allocate local IBF\n");
2130  return GNUNET_SYSERR;
2131  }
2134  op);
2135  return GNUNET_OK;
2136 }
2137 
2138 
2148 static int
2150  uint32_t ibf_size)
2151 {
2152  uint64_t buckets_sent = 0;
2153  struct InvertibleBloomFilter *ibf;
2154  op->differential_sync_iterations++;
2155 
2159  uint32_t ibf_min_size = IBF_MIN_SIZE;
2160 
2161  if (ibf_size < ibf_min_size)
2162  {
2163  ibf_size = ibf_min_size;
2164  }
2165  if (GNUNET_OK !=
2166  prepare_ibf (op, ibf_size))
2167  {
2168  /* allocation failed */
2169  return GNUNET_SYSERR;
2170  }
2171 
2173  "sending ibf of size %u\n",
2174  (unsigned int) ibf_size);
2175 
2176  {
2177  char name[64];
2178 
2180  sizeof(name),
2181  "# sent IBF (order %u)",
2182  ibf_size);
2184  }
2185 
2186  ibf = op->local_ibf;
2187 
2188  while (buckets_sent < ibf_size)
2189  {
2190  unsigned int buckets_in_message;
2191  struct GNUNET_MQ_Envelope *ev;
2192  struct IBFMessage *msg;
2193 
2194  buckets_in_message = ibf_size - buckets_sent;
2195  /* limit to maximum */
2196  if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
2197  buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
2198 
2199 #if MEASURE_PERFORMANCE
2200  perf_store.ibf.sent += 1;
2201  perf_store.ibf.sent_var_bytes += (buckets_in_message * IBF_BUCKET_SIZE);
2202 #endif
2203  ev = GNUNET_MQ_msg_extra (msg,
2204  buckets_in_message * IBF_BUCKET_SIZE,
2206  msg->ibf_size = ibf_size;
2207  msg->offset = htonl (buckets_sent);
2208  msg->salt = htonl (op->salt_send);
2209  msg->ibf_counter_bit_length = ibf_get_max_counter (ibf);
2210 
2211 
2212  ibf_write_slice (ibf, buckets_sent,
2213  buckets_in_message, &msg[1], msg->ibf_counter_bit_length);
2214  buckets_sent += buckets_in_message;
2216  "ibf chunk size %u, %llu/%u sent\n",
2217  (unsigned int) buckets_in_message,
2218  (unsigned long long) buckets_sent,
2219  (unsigned int) ibf_size);
2220  GNUNET_MQ_send (op->mq, ev);
2221  }
2222 
2223  /* The other peer must decode the IBF, so
2224  * we're passive. */
2225  op->phase = PHASE_PASSIVE_DECODING;
2226  return GNUNET_OK;
2227 }
2228 
2229 
2237 static unsigned int
2238 get_size_from_difference (unsigned int diff, int number_buckets_per_element,
2239  float ibf_bucket_number_factor)
2240 {
2243  return (((int) (diff * ibf_bucket_number_factor)) | 1);
2244 
2245 }
2246 
2247 
2248 static unsigned int
2249 get_next_ibf_size (float ibf_bucket_number_factor, unsigned int
2250  decoded_elements, unsigned int last_ibf_size)
2251 {
2252  unsigned int next_size = (unsigned int) ((last_ibf_size * 2)
2253  - (ibf_bucket_number_factor
2254  * decoded_elements));
2257  return next_size | 1;
2258 }
2259 
2260 
2270 static int
2272  const struct GNUNET_HashCode *key,
2273  void *value)
2274 {
2275  struct Operation *op = cls;
2276  struct GNUNET_SETU_ElementMessage *emsg;
2277  struct ElementEntry *ee = value;
2278  struct GNUNET_SETU_Element *el = &ee->element;
2279  struct GNUNET_MQ_Envelope *ev;
2280 
2282  "Sending element %s\n",
2283  GNUNET_h2s (key));
2284 #if MEASURE_PERFORMANCE
2285  perf_store.element_full.received += 1;
2286  perf_store.element_full.received_var_bytes += el->size;
2287 #endif
2288  ev = GNUNET_MQ_msg_extra (emsg,
2289  el->size,
2291  emsg->element_type = htons (el->element_type);
2292  GNUNET_memcpy (&emsg[1],
2293  el->data,
2294  el->size);
2295  GNUNET_MQ_send (op->mq,
2296  ev);
2297  return GNUNET_YES;
2298 }
2299 
2300 
2306 static void
2308 {
2309  struct GNUNET_MQ_Envelope *ev;
2310 
2311  op->phase = PHASE_FULL_SENDING;
2313  "Dedicing to transmit the full set\n");
2314  /* FIXME: use a more memory-friendly way of doing this with an
2315  iterator, just as we do in the non-full case! */
2316 
2317  // Randomize Elements to send
2318  op->set->content->elements_randomized = GNUNET_CONTAINER_multihashmap_create (
2319  32,GNUNET_NO);
2320  op->set->content->elements_randomized_salt = GNUNET_CRYPTO_random_u64 (
2322  UINT64_MAX);
2323  (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2324  &
2326  op);
2327 
2329  op->set->content->elements_randomized,
2331  op);
2332 #if MEASURE_PERFORMANCE
2333  perf_store.full_done.sent += 1;
2334 #endif
2336  GNUNET_MQ_send (op->mq,
2337  ev);
2338 }
2339 
2340 
2347 static int
2349  const struct StrataEstimatorMessage *msg)
2350 {
2351  struct Operation *op = cls;
2352  int is_compressed;
2353  size_t len;
2354 
2355  if (op->phase != PHASE_EXPECT_SE)
2356  {
2357  GNUNET_break (0);
2358  return GNUNET_SYSERR;
2359  }
2360  is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
2361  msg->header.type));
2362  len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
2363  if ((GNUNET_NO == is_compressed) &&
2365  {
2366  GNUNET_break (0);
2367  return GNUNET_SYSERR;
2368  }
2369  return GNUNET_OK;
2370 }
2371 
2372 
2379 static void
2381  const struct StrataEstimatorMessage *msg)
2382 {
2383 #if MEASURE_PERFORMANCE
2384  perf_store.se.received += 1;
2385  perf_store.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct
2387 #endif
2388  struct Operation *op = cls;
2389  struct MultiStrataEstimator *remote_se;
2390  unsigned int diff;
2391  uint64_t other_size;
2392  size_t len;
2393  int is_compressed;
2394  op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
2395  op->set->content->elements);
2396  // Setting peer site to receiving peer
2397  op->peer_site = 1;
2398 
2402  uint8_t allowed_phases[] = {PHASE_EXPECT_SE};
2403  if (GNUNET_OK !=
2404  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2405  {
2406  GNUNET_break (0);
2408  return;
2409  }
2410 
2412  if ((msg->se_count > 8) || (__builtin_popcount ((int) msg->se_count) != 1))
2413  {
2415  "PROTOCOL VIOLATION: Invalid number of se transmitted by other peer %u\n",
2416  msg->se_count);
2417  GNUNET_break_op (0);
2419  return;
2420  }
2421 
2422  is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
2423  msg->header.type));
2425  "# bytes of SE received",
2426  ntohs (msg->header.size),
2427  GNUNET_NO);
2428  len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
2429  other_size = GNUNET_ntohll (msg->set_size);
2430  op->remote_element_count = other_size;
2431 
2432  if (op->byzantine_upper_bound < op->remote_element_count)
2433  {
2435  "Exceeded configured upper bound <%lu> of element: %u\n",
2436  op->byzantine_upper_bound,
2437  op->remote_element_count);
2439  return;
2440  }
2441 
2444  SE_IBF_HASH_NUM);
2445  if (NULL == remote_se)
2446  {
2447  /* insufficient resources, fail */
2449  return;
2450  }
2451  if (GNUNET_OK !=
2453  len,
2454  is_compressed,
2455  msg->se_count,
2457  remote_se))
2458  {
2459  /* decompression failed */
2460  strata_estimator_destroy (remote_se);
2462  return;
2463  }
2464  GNUNET_assert (NULL != op->se);
2465  strata_estimator_difference (remote_se,
2466  op->se);
2467 
2468  /* Calculate remote local diff */
2469  long diff_remote = remote_se->stratas[0]->strata[0]->remote_decoded_count;
2470  long diff_local = remote_se->stratas[0]->strata[0]->local_decoded_count;
2471 
2472  /* Prevent estimations from overshooting max element */
2473  if (diff_remote + op->remote_element_count > op->byzantine_upper_bound)
2474  diff_remote = op->byzantine_upper_bound - op->remote_element_count;
2475  if (diff_local + op->local_element_count > op->byzantine_upper_bound)
2476  diff_local = op->byzantine_upper_bound - op->local_element_count;
2477  if ((diff_remote < 0) || (diff_local < 0))
2478  {
2479  strata_estimator_destroy (remote_se);
2481  "PROTOCOL VIOLATION: More element is set as upper boundary or other peer is "
2482  "malicious: remote diff %ld, local diff: %ld\n",
2483  diff_remote, diff_local);
2484  GNUNET_break_op (0);
2486  return;
2487  }
2488 
2489  /* Make estimation more precise in initial sync cases */
2490  if (0 == op->remote_element_count)
2491  {
2492  diff_remote = 0;
2493  diff_local = op->local_element_count;
2494  }
2495  if (0 == op->local_element_count)
2496  {
2497  diff_local = 0;
2498  diff_remote = op->remote_element_count;
2499  }
2500 
2501  diff = diff_remote + diff_local;
2502  op->remote_set_diff = diff_remote;
2503 
2505  uint64_t avg_element_size = 0;
2506  if (0 < op->local_element_count)
2507  {
2508  op->total_elements_size_local = 0;
2509  GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2510  &
2512  op);
2513  avg_element_size = op->total_elements_size_local / op->local_element_count;
2514  }
2515 
2516  op->mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2518  op->set->content->
2519  elements),
2520  op->
2521  remote_element_count,
2522  diff_remote,
2523  diff_local,
2524  op->
2525  rtt_bandwidth_tradeoff,
2526  op->
2527  ibf_bucket_number_factor);
2528 
2529 #if MEASURE_PERFORMANCE
2530  perf_store.se_diff_local = diff_local;
2531  perf_store.se_diff_remote = diff_remote;
2532  perf_store.se_diff = diff;
2533  perf_store.mode_of_operation = op->mode_of_operation;
2534 #endif
2535 
2536  strata_estimator_destroy (remote_se);
2538  op->se = NULL;
2540  "got se diff=%d, using ibf size %d\n",
2541  diff,
2542  1U << get_size_from_difference (diff, op->ibf_number_buckets_per_element,
2543  op->ibf_bucket_number_factor));
2544 
2545  {
2546  char *set_debug;
2547 
2548  set_debug = getenv ("GNUNET_SETU_BENCHMARK");
2549  if ((NULL != set_debug) &&
2550  (0 == strcmp (set_debug, "1")))
2551  {
2552  FILE *f = fopen ("set.log", "a");
2553  fprintf (f, "%llu\n", (unsigned long long) diff);
2554  fclose (f);
2555  }
2556  }
2557 
2558  if ((GNUNET_YES == op->byzantine) &&
2559  (other_size < op->byzantine_lower_bound))
2560  {
2561  GNUNET_break (0);
2563  return;
2564  }
2565 
2566  if ((GNUNET_YES == op->force_full) ||
2567  (op->mode_of_operation != DIFFERENTIAL_SYNC))
2568  {
2570  "Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
2571  diff,
2572  (unsigned long long) op->initial_size);
2574  "# of full sends",
2575  1,
2576  GNUNET_NO);
2577  if (FULL_SYNC_LOCAL_SENDING_FIRST == op->mode_of_operation)
2578  {
2579  struct TransmitFullMessage *signal_msg;
2580  struct GNUNET_MQ_Envelope *ev;
2581  ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2583  signal_msg->remote_set_difference = htonl (diff_local);
2584  signal_msg->remote_set_size = htonl (op->local_element_count);
2585  signal_msg->local_set_difference = htonl (diff_remote);
2586  GNUNET_MQ_send (op->mq,
2587  ev);
2588  send_full_set (op);
2589  }
2590  else
2591  {
2592  struct GNUNET_MQ_Envelope *ev;
2593 
2595  "Telling other peer that we expect its full set\n");
2596  op->phase = PHASE_FULL_RECEIVING;
2597 #if MEASURE_PERFORMANCE
2598  perf_store.request_full.sent += 1;
2599 #endif
2600  struct TransmitFullMessage *signal_msg;
2601  ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2603  signal_msg->remote_set_difference = htonl (diff_local);
2604  signal_msg->remote_set_size = htonl (op->local_element_count);
2605  signal_msg->local_set_difference = htonl (diff_remote);
2606  GNUNET_MQ_send (op->mq,
2607  ev);
2608  }
2609  }
2610  else
2611  {
2613  "# of ibf sends",
2614  1,
2615  GNUNET_NO);
2616  if (GNUNET_OK !=
2617  send_ibf (op,
2619  op->ibf_number_buckets_per_element,
2620  op->ibf_bucket_number_factor)))
2621  {
2622  /* Internal error, best we can do is shut the connection */
2624  "Failed to send IBF, closing connection\n");
2626  return;
2627  }
2628  }
2629  GNUNET_CADET_receive_done (op->channel);
2630 }
2631 
2632 
2640 static int
2642  uint32_t key,
2643  void *value)
2644 {
2645  struct SendElementClosure *sec = cls;
2646  struct Operation *op = sec->op;
2647  struct KeyEntry *ke = value;
2648  struct GNUNET_MQ_Envelope *ev;
2649  struct GNUNET_MessageHeader *mh;
2650 
2651  /* Detect 32-bit key collision for the 64-bit IBF keys. */
2652  if (ke->ibf_key.key_val != sec->ibf_key.key_val)
2653  {
2654  op->active_passive_switch_required = true;
2655  return GNUNET_YES;
2656  }
2657 
2658  /* Prevent implementation from sending a offer multiple times in case of roll switch */
2659  if (GNUNET_YES ==
2661  op->message_control_flow,
2662  &ke->element->element_hash,
2663  OFFER_MESSAGE)
2664  )
2665  {
2667  "Skipping already sent processed element offer!\n");
2668  return GNUNET_YES;
2669  }
2670 
2671  /* Save send offer message for message control */
2672  if (GNUNET_YES !=
2674  op->message_control_flow,
2675  MSG_CFS_SENT,
2676  &ke->element->element_hash,
2677  OFFER_MESSAGE)
2678  )
2679  {
2681  "Double offer message sent found!\n");
2682  GNUNET_break (0);
2684  return GNUNET_NO;
2685  }
2686  ;
2687 
2688  /* Mark element to be expected to received */
2689  if (GNUNET_YES !=
2691  op->message_control_flow,
2693  &ke->element->element_hash,
2695  )
2696  {
2698  "Double demand received found!\n");
2699  GNUNET_break (0);
2701  return GNUNET_NO;
2702  }
2703  ;
2704 #if MEASURE_PERFORMANCE
2705  perf_store.offer.sent += 1;
2706  perf_store.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode);
2707 #endif
2709  sizeof(struct GNUNET_HashCode),
2711  GNUNET_assert (NULL != ev);
2712  *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
2714  "[OP %p] sending element offer (%s) to peer\n",
2715  op,
2716  GNUNET_h2s (&ke->element->element_hash));
2717  GNUNET_MQ_send (op->mq, ev);
2718  return GNUNET_YES;
2719 }
2720 
2721 
2728 void
2730  struct IBF_Key ibf_key)
2731 {
2732  struct SendElementClosure send_cls;
2733 
2734  send_cls.ibf_key = ibf_key;
2735  send_cls.op = op;
2737  op->key_to_element,
2738  (uint32_t) ibf_key.
2739  key_val,
2741  &send_cls);
2742 }
2743 
2744 
2752 static int
2754 {
2755  struct IBF_Key key;
2756  struct IBF_Key last_key;
2757  int side;
2758  unsigned int num_decoded;
2759  struct InvertibleBloomFilter *diff_ibf;
2760 
2762 
2763  if (GNUNET_OK !=
2764  prepare_ibf (op,
2765  op->remote_ibf->size))
2766  {
2767  GNUNET_break (0);
2768  /* allocation failed */
2769  return GNUNET_SYSERR;
2770  }
2771 
2772  diff_ibf = ibf_dup (op->local_ibf);
2773  ibf_subtract (diff_ibf,
2774  op->remote_ibf);
2775 
2776  ibf_destroy (op->remote_ibf);
2777  op->remote_ibf = NULL;
2778 
2780  "decoding IBF (size=%u)\n",
2781  diff_ibf->size);
2782 
2783  num_decoded = 0;
2784  key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
2785 
2786  while (1)
2787  {
2788  int res;
2789  int cycle_detected = GNUNET_NO;
2790 
2791  last_key = key;
2792 
2793  res = ibf_decode (diff_ibf,
2794  &side,
2795  &key);
2796  if (res == GNUNET_OK)
2797  {
2799  "decoded ibf key %lx\n",
2800  (unsigned long) key.key_val);
2801  num_decoded += 1;
2802  if ((num_decoded > diff_ibf->size) ||
2803  ((num_decoded > 1) &&
2804  (last_key.key_val == key.key_val)))
2805  {
2807  "detected cyclic ibf (decoded %u/%u)\n",
2808  num_decoded,
2809  diff_ibf->size);
2810  cycle_detected = GNUNET_YES;
2811  }
2812  }
2813  if ((GNUNET_SYSERR == res) ||
2814  (GNUNET_YES == cycle_detected))
2815  {
2816  uint32_t next_size;
2819  next_size = get_next_ibf_size (op->ibf_bucket_number_factor, num_decoded,
2820  diff_ibf->size);
2823  uint32_t ibf_min_size = IBF_MIN_SIZE | 1;
2824 
2825  if (next_size<ibf_min_size)
2826  next_size = ibf_min_size;
2827 
2828 
2829  if (next_size <= MAX_IBF_SIZE)
2830  {
2832  "decoding failed, sending larger ibf (size %u)\n",
2833  next_size);
2835  "# of IBF retries",
2836  1,
2837  GNUNET_NO);
2838 #if MEASURE_PERFORMANCE
2839  perf_store.active_passive_switches += 1;
2840 #endif
2841 
2842  op->salt_send = op->salt_receive++;
2843 
2844  if (GNUNET_OK !=
2845  send_ibf (op, next_size))
2846  {
2847  /* Internal error, best we can do is shut the connection */
2849  "Failed to send IBF, closing connection\n");
2851  ibf_destroy (diff_ibf);
2852  return GNUNET_SYSERR;
2853  }
2854  }
2855  else
2856  {
2858  "# of failed union operations (too large)",
2859  1,
2860  GNUNET_NO);
2861  // XXX: Send the whole set, element-by-element
2863  "set union failed: reached ibf limit\n");
2865  ibf_destroy (diff_ibf);
2866  return GNUNET_SYSERR;
2867  }
2868  break;
2869  }
2870  if (GNUNET_NO == res)
2871  {
2872  struct GNUNET_MQ_Envelope *ev;
2873 
2875  "transmitted all values, sending DONE\n");
2876 
2877 #if MEASURE_PERFORMANCE
2878  perf_store.done.sent += 1;
2879 #endif
2881  GNUNET_MQ_send (op->mq, ev);
2882  /* We now wait until we get a DONE message back
2883  * and then wait for our MQ to be flushed and all our
2884  * demands be delivered. */
2885  break;
2886  }
2887  if (1 == side)
2888  {
2889  struct IBF_Key unsalted_key;
2890  unsalt_key (&key,
2891  op->salt_receive,
2892  &unsalted_key);
2894  unsalted_key);
2895  }
2896  else if (-1 == side)
2897  {
2898  struct GNUNET_MQ_Envelope *ev;
2899  struct InquiryMessage *msg;
2900 
2901 #if MEASURE_PERFORMANCE
2902  perf_store.inquery.sent += 1;
2903  perf_store.inquery.sent_var_bytes += sizeof(struct IBF_Key);
2904 #endif
2905 
2907  struct GNUNET_HashContext *hashed_key_context =
2909  struct GNUNET_HashCode *hashed_key = (struct
2911  sizeof(struct GNUNET_HashCode));
2913  GNUNET_CRYPTO_hash_context_read (hashed_key_context,
2914  &key,
2915  sizeof(struct IBF_Key));
2916  GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
2917  hashed_key);
2918  GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
2919  hashed_key,
2920  &mcfs,
2922  );
2923 
2924  /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
2925  * the effort additional complexity. */
2926  ev = GNUNET_MQ_msg_extra (msg,
2927  sizeof(struct IBF_Key),
2929  msg->salt = htonl (op->salt_receive);
2930  GNUNET_memcpy (&msg[1],
2931  &key,
2932  sizeof(struct IBF_Key));
2934  "sending element inquiry for IBF key %lx\n",
2935  (unsigned long) key.key_val);
2936  GNUNET_MQ_send (op->mq, ev);
2937  }
2938  else
2939  {
2940  GNUNET_assert (0);
2941  }
2942  }
2943  ibf_destroy (diff_ibf);
2944  return GNUNET_OK;
2945 }
2946 
2947 
2955 static int
2957  const struct TransmitFullMessage *msg)
2958 {
2959  return GNUNET_OK;
2960 }
2961 
2962 
2969 static void
2971  const struct TransmitFullMessage *msg)
2972 {
2973  struct Operation *op = cls;
2974 
2978  uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
2979  if (GNUNET_OK !=
2980  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2981  {
2982  GNUNET_break (0);
2984  return;
2985  }
2986 
2988  op->remote_element_count = ntohl (msg->remote_set_size);
2989  op->remote_set_diff = ntohl (msg->remote_set_difference);
2990  op->local_set_diff = ntohl (msg->local_set_difference);
2991 
2994  {
2996  "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
2997  "criteria\n");
2998  GNUNET_break_op (0);
3000  return;
3001  }
3002 
3004  op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
3005  op->set->content->elements);
3006  uint64_t avg_element_size = 0;
3007  if (0 < op->local_element_count)
3008  {
3009  op->total_elements_size_local = 0;
3010  GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
3011  &
3013  op);
3014  avg_element_size = op->total_elements_size_local / op->local_element_count;
3015  }
3016 
3018  int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
3019  op->
3021  op->
3023  op->local_set_diff,
3024  op->remote_set_diff,
3025  op->
3027  op->
3030  {
3032  "PROTOCOL VIOLATION: Remote peer choose to send his full set first but correct mode would have been"
3033  " : %d\n", mode_of_operation);
3034  GNUNET_break_op (0);
3036  return;
3037  }
3038  op->phase = PHASE_FULL_RECEIVING;
3039 }
3040 
3041 
3052 static int
3054  const struct IBFMessage *msg)
3055 {
3056  struct Operation *op = cls;
3057  unsigned int buckets_in_message;
3058 
3059  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
3060  / IBF_BUCKET_SIZE;
3061  if (0 == buckets_in_message)
3062  {
3063  GNUNET_break_op (0);
3064  return GNUNET_SYSERR;
3065  }
3066  if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
3067  * IBF_BUCKET_SIZE)
3068  {
3069  GNUNET_break_op (0);
3070  return GNUNET_SYSERR;
3071  }
3072  if (op->phase == PHASE_EXPECT_IBF_LAST)
3073  {
3074  if (ntohl (msg->offset) != op->ibf_buckets_received)
3075  {
3076  GNUNET_break_op (0);
3077  return GNUNET_SYSERR;
3078  }
3079 
3080  if (msg->ibf_size != op->remote_ibf->size)
3081  {
3082  GNUNET_break_op (0);
3083  return GNUNET_SYSERR;
3084  }
3085  if (ntohl (msg->salt) != op->salt_receive)
3086  {
3087  GNUNET_break_op (0);
3088  return GNUNET_SYSERR;
3089  }
3090  }
3091  else if ((op->phase != PHASE_PASSIVE_DECODING) &&
3092  (op->phase != PHASE_EXPECT_IBF))
3093  {
3094  GNUNET_break_op (0);
3095  return GNUNET_SYSERR;
3096  }
3097 
3098  return GNUNET_OK;
3099 }
3100 
3101 
3111 static void
3113  const struct IBFMessage *msg)
3114 {
3115  struct Operation *op = cls;
3116  unsigned int buckets_in_message;
3120  uint8_t allowed_phases[] = {PHASE_EXPECT_IBF, PHASE_EXPECT_IBF_LAST,
3122  if (GNUNET_OK !=
3123  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3124  {
3125  GNUNET_break (0);
3127  return;
3128  }
3129  op->differential_sync_iterations++;
3131  op->active_passive_switch_required = false;
3132 
3133 #if MEASURE_PERFORMANCE
3134  perf_store.ibf.received += 1;
3135  perf_store.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg);
3136 #endif
3137 
3138  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
3139  / IBF_BUCKET_SIZE;
3140  if ((op->phase == PHASE_PASSIVE_DECODING) ||
3141  (op->phase == PHASE_EXPECT_IBF))
3142  {
3143  op->phase = PHASE_EXPECT_IBF_LAST;
3144  GNUNET_assert (NULL == op->remote_ibf);
3146  "Creating new ibf of size %u\n",
3147  ntohl (msg->ibf_size));
3148  // op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
3149  op->remote_ibf = ibf_create (msg->ibf_size,
3150  ((uint8_t) op->ibf_number_buckets_per_element));
3151  op->salt_receive = ntohl (msg->salt);
3153  "Receiving new IBF with salt %u\n",
3154  op->salt_receive);
3155  if (NULL == op->remote_ibf)
3156  {
3158  "Failed to parse remote IBF, closing connection\n");
3160  return;
3161  }
3162  op->ibf_buckets_received = 0;
3163  if (0 != ntohl (msg->offset))
3164  {
3165  GNUNET_break_op (0);
3167  return;
3168  }
3169  }
3170  else
3171  {
3174  "Received more of IBF\n");
3175  }
3176  GNUNET_assert (NULL != op->remote_ibf);
3177 
3178  ibf_read_slice (&msg[1],
3179  op->ibf_buckets_received,
3180  buckets_in_message,
3181  op->remote_ibf, msg->ibf_counter_bit_length);
3182  op->ibf_buckets_received += buckets_in_message;
3183 
3184  if (op->ibf_buckets_received == op->remote_ibf->size)
3185  {
3187  "received full ibf\n");
3188  op->phase = PHASE_ACTIVE_DECODING;
3189  if (GNUNET_OK !=
3190  decode_and_send (op))
3191  {
3192  /* Internal error, best we can do is shut down */
3194  "Failed to decode IBF, closing connection\n");
3196  return;
3197  }
3198  }
3199  GNUNET_CADET_receive_done (op->channel);
3200 }
3201 
3202 
3211 static void
3213  const struct GNUNET_SETU_Element *element,
3215 {
3216  struct GNUNET_MQ_Envelope *ev;
3217  struct GNUNET_SETU_ResultMessage *rm;
3218 
3220  "sending element (size %u) to client\n",
3221  element->size);
3222  GNUNET_assert (0 != op->client_request_id);
3223  ev = GNUNET_MQ_msg_extra (rm,
3224  element->size,
3226  if (NULL == ev)
3227  {
3228  GNUNET_MQ_discard (ev);
3229  GNUNET_break (0);
3230  return;
3231  }
3232  rm->result_status = htons (status);
3233  rm->request_id = htonl (op->client_request_id);
3234  rm->element_type = htons (element->element_type);
3236  op->key_to_element));
3237  GNUNET_memcpy (&rm[1],
3238  element->data,
3239  element->size);
3240  GNUNET_MQ_send (op->set->cs->mq,
3241  ev);
3242 }
3243 
3244 
3250 static void
3252 {
3253  unsigned int num_demanded;
3254 
3255  num_demanded = GNUNET_CONTAINER_multihashmap_size (
3256  op->demanded_hashes);
3257  int send_done = GNUNET_CONTAINER_multihashmap_iterate (
3258  op->message_control_flow,
3259  &
3261  op);
3262  if (PHASE_FINISH_WAITING == op->phase)
3263  {
3265  "In PHASE_FINISH_WAITING, pending %u demands -> %d\n",
3266  num_demanded, op->peer_site);
3267  if (-1 != send_done)
3268  {
3269  struct GNUNET_MQ_Envelope *ev;
3270 
3271  op->phase = PHASE_FINISHED;
3272 #if MEASURE_PERFORMANCE
3273  perf_store.done.sent += 1;
3274 #endif
3276  GNUNET_MQ_send (op->mq,
3277  ev);
3278  /* We now wait until the other peer sends P2P_OVER
3279  * after it got all elements from us. */
3280  }
3281  }
3282  if (PHASE_FINISH_CLOSING == op->phase)
3283  {
3285  "In PHASE_FINISH_CLOSING, pending %u demands %d\n",
3286  num_demanded, op->peer_site);
3287  if (-1 != send_done)
3288  {
3289  op->phase = PHASE_FINISHED;
3290  send_client_done (op);
3292  }
3293  }
3294 }
3295 
3296 
3303 static int
3305  const struct GNUNET_SETU_ElementMessage *emsg)
3306 {
3307  struct Operation *op = cls;
3308 
3309  if (0 == GNUNET_CONTAINER_multihashmap_size (op->demanded_hashes))
3310  {
3311  GNUNET_break_op (0);
3312  return GNUNET_SYSERR;
3313  }
3314  return GNUNET_OK;
3315 }
3316 
3317 
3326 static void
3328  const struct GNUNET_SETU_ElementMessage *emsg)
3329 {
3330  struct Operation *op = cls;
3331  struct ElementEntry *ee;
3332  struct KeyEntry *ke;
3333  uint16_t element_size;
3334 
3338  uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3340  if (GNUNET_OK !=
3341  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3342  {
3343  GNUNET_break (0);
3345  return;
3346  }
3347 
3348  element_size = ntohs (emsg->header.size) - sizeof(struct
3350 #if MEASURE_PERFORMANCE
3351  perf_store.element.received += 1;
3352  perf_store.element.received_var_bytes += element_size;
3353 #endif
3354 
3355  ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
3356  GNUNET_memcpy (&ee[1],
3357  &emsg[1],
3358  element_size);
3359  ee->element.size = element_size;
3360  ee->element.data = &ee[1];
3361  ee->element.element_type = ntohs (emsg->element_type);
3362  ee->remote = GNUNET_YES;
3364  &ee->element_hash);
3365  if (GNUNET_NO ==
3366  GNUNET_CONTAINER_multihashmap_remove (op->demanded_hashes,
3367  &ee->element_hash,
3368  NULL))
3369  {
3370  /* We got something we didn't demand, since it's not in our map. */
3371  GNUNET_break_op (0);
3373  return;
3374  }
3375 
3376  if (GNUNET_OK !=
3378  op->message_control_flow,
3380  &ee->element_hash,
3382  )
3383  {
3385  "An element has been received more than once!\n");
3386  GNUNET_break (0);
3388  return;
3389  }
3390 
3392  "Got element (size %u, hash %s) from peer\n",
3393  (unsigned int) element_size,
3394  GNUNET_h2s (&ee->element_hash));
3395 
3397  "# received elements",
3398  1,
3399  GNUNET_NO);
3401  "# exchanged elements",
3402  1,
3403  GNUNET_NO);
3404 
3405  op->received_total++;
3406 
3407  ke = op_get_element (op,
3408  &ee->element_hash);
3409  if (NULL != ke)
3410  {
3411  /* Got repeated element. Should not happen since
3412  * we track demands. */
3414  "# repeated elements",
3415  1,
3416  GNUNET_NO);
3417  ke->received = GNUNET_YES;
3418  GNUNET_free (ee);
3419  }
3420  else
3421  {
3423  "Registering new element from remote peer\n");
3424  op->received_fresh++;
3426  /* only send results immediately if the client wants it */
3428  &ee->element,
3430  }
3431 
3432  if ((op->received_total > 8) &&
3433  (op->received_fresh < op->received_total / 3))
3434  {
3435  /* The other peer gave us lots of old elements, there's something wrong. */
3436  GNUNET_break_op (0);
3438  return;
3439  }
3440  GNUNET_CADET_receive_done (op->channel);
3441  maybe_finish (op);
3442 }
3443 
3444 
3451 static int
3453  const struct GNUNET_SETU_ElementMessage *emsg)
3454 {
3455  struct Operation *op = cls;
3456 
3457  (void) op;
3458 
3459  // FIXME: check that we expect full elements here?
3460  return GNUNET_OK;
3461 }
3462 
3463 
3470 static void
3472  const struct GNUNET_SETU_ElementMessage *emsg)
3473 {
3474  struct Operation *op = cls;
3475  struct ElementEntry *ee;
3476  struct KeyEntry *ke;
3477  uint16_t element_size;
3478 
3482  uint8_t allowed_phases[] = {PHASE_FULL_RECEIVING, PHASE_FULL_SENDING};
3483  if (GNUNET_OK !=
3484  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3485  {
3486  GNUNET_break (0);
3488  return;
3489  }
3490 
3491  element_size = ntohs (emsg->header.size)
3492  - sizeof(struct GNUNET_SETU_ElementMessage);
3493 
3494 #if MEASURE_PERFORMANCE
3495  perf_store.element_full.received += 1;
3496  perf_store.element_full.received_var_bytes += element_size;
3497 #endif
3498 
3499  ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
3500  GNUNET_memcpy (&ee[1], &emsg[1], element_size);
3501  ee->element.size = element_size;
3502  ee->element.data = &ee[1];
3503  ee->element.element_type = ntohs (emsg->element_type);
3504  ee->remote = GNUNET_YES;
3506  &ee->element_hash);
3508  "Got element (full diff, size %u, hash %s) from peer\n",
3509  (unsigned int) element_size,
3510  GNUNET_h2s (&ee->element_hash));
3511 
3513  "# received elements",
3514  1,
3515  GNUNET_NO);
3517  "# exchanged elements",
3518  1,
3519  GNUNET_NO);
3520 
3521  op->received_total++;
3522  ke = op_get_element (op,
3523  &ee->element_hash);
3524  if (NULL != ke)
3525  {
3527  "# repeated elements",
3528  1,
3529  GNUNET_NO);
3531  ke->received = GNUNET_YES;
3532  GNUNET_free (ee);
3533  }
3534  else
3535  {
3537  "Registering new element from remote peer\n");
3538  op->received_fresh++;
3540  /* only send results immediately if the client wants it */
3542  &ee->element,
3544  }
3545 
3546 
3547  if ((GNUNET_YES == op->byzantine) &&
3548  (op->received_total > op->remote_element_count) )
3549  {
3550  /* The other peer gave us lots of old elements, there's something wrong. */
3552  "Other peer sent %llu elements while pretending to have %llu elements, failing operation\n",
3553  (unsigned long long) op->received_total,
3554  (unsigned long long) op->remote_element_count);
3555  GNUNET_break_op (0);
3557  return;
3558  }
3559  GNUNET_CADET_receive_done (op->channel);
3560 }
3561 
3562 
3570 static int
3572  const struct InquiryMessage *msg)
3573 {
3574  struct Operation *op = cls;
3575  unsigned int num_keys;
3576 
3577  if (op->phase != PHASE_PASSIVE_DECODING)
3578  {
3579  GNUNET_break_op (0);
3580  return GNUNET_SYSERR;
3581  }
3582  num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3583  / sizeof(struct IBF_Key);
3584  if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3585  != num_keys * sizeof(struct IBF_Key))
3586  {
3587  GNUNET_break_op (0);
3588  return GNUNET_SYSERR;
3589  }
3590  return GNUNET_OK;
3591 }
3592 
3593 
3600 static void
3602  const struct InquiryMessage *msg)
3603 {
3604  struct Operation *op = cls;
3605  const struct IBF_Key *ibf_key;
3606  unsigned int num_keys;
3607 
3611  uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
3612  if (GNUNET_OK !=
3613  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3614  {
3615  GNUNET_break (0);
3617  return;
3618  }
3619 
3620 #if MEASURE_PERFORMANCE
3621  perf_store.inquery.received += 1;
3622  perf_store.inquery.received_var_bytes += (ntohs (msg->header.size)
3623  - sizeof(struct InquiryMessage));
3624 #endif
3625 
3627  "Received union inquiry\n");
3628  num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3629  / sizeof(struct IBF_Key);
3630  ibf_key = (const struct IBF_Key *) &msg[1];
3631 
3633  struct GNUNET_HashContext *hashed_key_context =
3635  struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*) GNUNET_malloc (
3636  sizeof(struct GNUNET_HashCode));;
3638  GNUNET_CRYPTO_hash_context_read (hashed_key_context,
3639  &ibf_key,
3640  sizeof(struct IBF_Key));
3641  GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
3642  hashed_key);
3643  GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
3644  hashed_key,
3645  &mcfs,
3647  );
3648 
3649  while (0 != num_keys--)
3650  {
3651  struct IBF_Key unsalted_key;
3652  unsalt_key (ibf_key,
3653  ntohl (msg->salt),
3654  &unsalted_key);
3656  unsalted_key);
3657  ibf_key++;
3658  }
3659  GNUNET_CADET_receive_done (op->channel);
3660 }
3661 
3662 
3673 static int
3675  uint32_t key,
3676  void *value)
3677 {
3678  struct Operation *op = cls;
3679  struct KeyEntry *ke = value;
3680  struct GNUNET_MQ_Envelope *ev;
3681  struct GNUNET_SETU_ElementMessage *emsg;
3682  struct ElementEntry *ee = ke->element;
3683 
3684  if (GNUNET_YES == ke->received)
3685  return GNUNET_YES;
3686 #if MEASURE_PERFORMANCE
3687  perf_store.element_full.received += 1;
3688 #endif
3689  ev = GNUNET_MQ_msg_extra (emsg,
3690  ee->element.size,
3692  GNUNET_memcpy (&emsg[1],
3693  ee->element.data,
3694  ee->element.size);
3695  emsg->element_type = htons (ee->element.element_type);
3696  GNUNET_MQ_send (op->mq,
3697  ev);
3698  return GNUNET_YES;
3699 }
3700 
3701 
3708 static int
3710  const struct TransmitFullMessage *mh)
3711 {
3712  return GNUNET_OK;
3713 }
3714 
3715 
3716 static void
3718  const struct TransmitFullMessage *msg)
3719 {
3720  struct Operation *op = cls;
3721 
3725  uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
3726  if (GNUNET_OK !=
3727  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3728  {
3729  GNUNET_break (0);
3731  return;
3732  }
3733 
3734  op->remote_element_count = ntohl (msg->remote_set_size);
3735  op->remote_set_diff = ntohl (msg->remote_set_difference);
3736  op->local_set_diff = ntohl (msg->local_set_difference);
3737 
3738 
3740  {
3742  "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
3743  "criteria\n");
3744  GNUNET_break_op (0);
3746  return;
3747  }
3748 
3749 #if MEASURE_PERFORMANCE
3750  perf_store.request_full.received += 1;
3751 #endif
3752 
3754  "Received request for full set transmission\n");
3755 
3757  op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
3758  op->set->content->elements);
3759  uint64_t avg_element_size = 0;
3760  if (0 < op->local_element_count)
3761  {
3762  op->total_elements_size_local = 0;
3763  GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
3764  &
3766  op);
3767  avg_element_size = op->total_elements_size_local / op->local_element_count;
3768  }
3769 
3770  int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
3771  op->
3773  op->
3775  op->local_set_diff,
3776  op->remote_set_diff,
3777  op->
3779  op->
3782  {
3784  "PROTOCOL VIOLATION: Remote peer choose to request the full set first but correct mode would have been"
3785  " : %d\n", mode_of_operation);
3786  GNUNET_break_op (0);
3788  return;
3789  }
3790 
3791  // FIXME: we need to check that our set is larger than the
3792  // byzantine_lower_bound by some threshold
3793  send_full_set (op);
3794  GNUNET_CADET_receive_done (op->channel);
3795 }
3796 
3797 
3804 static void
3806  const struct GNUNET_MessageHeader *mh)
3807 {
3808  struct Operation *op = cls;
3809 
3813  uint8_t allowed_phases[] = {PHASE_FULL_SENDING, PHASE_FULL_RECEIVING};
3814  if (GNUNET_OK !=
3815  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3816  {
3817  GNUNET_break (0);
3819  return;
3820  }
3821 
3822 #if MEASURE_PERFORMANCE
3823  perf_store.full_done.received += 1;
3824 #endif
3825 
3826  switch (op->phase)
3827  {
3828  case PHASE_FULL_RECEIVING:
3829  {
3830  struct GNUNET_MQ_Envelope *ev;
3831 
3832  if ((GNUNET_YES == op->byzantine) &&
3833  (op->received_total != op->remote_element_count) )
3834  {
3835  /* The other peer gave not enough elements before sending full done, there's something wrong. */
3837  "Other peer sent only %llu/%llu fresh elements, failing operation\n",
3838  (unsigned long long) op->received_total,
3839  (unsigned long long) op->remote_element_count);
3840  GNUNET_break_op (0);
3842  return;
3843  }
3844 
3846  "got FULL DONE, sending elements that other peer is missing\n");
3847 
3848  /* send all the elements that did not come from the remote peer */
3851  op);
3852 #if MEASURE_PERFORMANCE
3853  perf_store.full_done.sent += 1;
3854 #endif
3856  GNUNET_MQ_send (op->mq,
3857  ev);
3858  op->phase = PHASE_FINISHED;
3859  /* we now wait until the other peer sends us the OVER message*/
3860  }
3861  break;
3862 
3863  case PHASE_FULL_SENDING:
3864  {
3866  "got FULL DONE, finishing\n");
3867  /* We sent the full set, and got the response for that. We're done. */
3868  op->phase = PHASE_FINISHED;
3869  GNUNET_CADET_receive_done (op->channel);
3870  send_client_done (op);
3872  return;
3873  }
3874 
3875  default:
3877  "Handle full done phase is %u\n",
3878  (unsigned) op->phase);
3879  GNUNET_break_op (0);
3881  return;
3882  }
3883  GNUNET_CADET_receive_done (op->channel);
3884 }
3885 
3886 
3895 static int
3897  const struct GNUNET_MessageHeader *mh)
3898 {
3899  struct Operation *op = cls;
3900  unsigned int num_hashes;
3901 
3902  (void) op;
3903  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3904  / sizeof(struct GNUNET_HashCode);
3905  if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3906  != num_hashes * sizeof(struct GNUNET_HashCode))
3907  {
3908  GNUNET_break_op (0);
3909  return GNUNET_SYSERR;
3910  }
3911  return GNUNET_OK;
3912 }
3913 
3914 
3922 static void
3924  const struct GNUNET_MessageHeader *mh)
3925 {
3926  struct Operation *op = cls;
3927  struct ElementEntry *ee;
3928  struct GNUNET_SETU_ElementMessage *emsg;
3929  const struct GNUNET_HashCode *hash;
3930  unsigned int num_hashes;
3931  struct GNUNET_MQ_Envelope *ev;
3932 
3936  uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3938  if (GNUNET_OK !=
3939  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3940  {
3941  GNUNET_break (0);
3943  return;
3944  }
3945 #if MEASURE_PERFORMANCE
3946  perf_store.demand.received += 1;
3947  perf_store.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct
3949 #endif
3950 
3951  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3952  / sizeof(struct GNUNET_HashCode);
3953  for (hash = (const struct GNUNET_HashCode *) &mh[1];
3954  num_hashes > 0;
3955  hash++, num_hashes--)
3956  {
3957  ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
3958  hash);
3959  if (NULL == ee)
3960  {
3961  /* Demand for non-existing element. */
3962  GNUNET_break_op (0);
3964  return;
3965  }
3966 
3967  /* Save send demand message for message control */
3968  if (GNUNET_YES !=
3970  op->message_control_flow,
3972  &ee->element_hash,
3974  )
3975  {
3977  "Double demand message received found!\n");
3978  GNUNET_break (0);
3980  return;
3981  }
3982  ;
3983 
3984  /* Mark element to be expected to received */
3985  if (GNUNET_YES !=
3987  op->message_control_flow,
3988  MSG_CFS_SENT,
3989  &ee->element_hash,
3991  )
3992  {
3994  "Double element message sent found!\n");
3995  GNUNET_break (0);
3997  return;
3998  }
4000  {
4001  /* Probably confused lazily copied sets. */
4002  GNUNET_break_op (0);
4004  return;
4005  }
4006 #if MEASURE_PERFORMANCE
4007  perf_store.element.sent += 1;
4008  perf_store.element.sent_var_bytes += ee->element.size;
4009 #endif
4010  ev = GNUNET_MQ_msg_extra (emsg,
4011  ee->element.size,
4013  GNUNET_memcpy (&emsg[1],
4014  ee->element.data,
4015  ee->element.size);
4016  emsg->reserved = htons (0);
4017  emsg->element_type = htons (ee->element.element_type);
4019  "[OP %p] Sending demanded element (size %u, hash %s) to peer\n",
4020  op,
4021  (unsigned int) ee->element.size,
4022  GNUNET_h2s (&ee->element_hash));
4023  GNUNET_MQ_send (op->mq, ev);
4025  "# exchanged elements",
4026  1,
4027  GNUNET_NO);
4028  if (op->symmetric)
4030  &ee->element,
4032  }
4033  GNUNET_CADET_receive_done (op->channel);
4034  maybe_finish (op);
4035 }
4036 
4037 
4045 static int
4047  const struct GNUNET_MessageHeader *mh)
4048 {
4049  struct Operation *op = cls;
4050  unsigned int num_hashes;
4051 
4052  /* look up elements and send them */
4053  if ((op->phase != PHASE_PASSIVE_DECODING) &&
4054  (op->phase != PHASE_ACTIVE_DECODING))
4055  {
4056  GNUNET_break_op (0);
4057  return GNUNET_SYSERR;
4058  }
4059  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
4060  / sizeof(struct GNUNET_HashCode);
4061  if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
4062  num_hashes * sizeof(struct GNUNET_HashCode))
4063  {
4064  GNUNET_break_op (0);
4065  return GNUNET_SYSERR;
4066  }
4067  return GNUNET_OK;
4068 }
4069 
4070 
4078 static void
4080  const struct GNUNET_MessageHeader *mh)
4081 {
4082  struct Operation *op = cls;
4083  const struct GNUNET_HashCode *hash;
4084  unsigned int num_hashes;
4088  uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4089  if (GNUNET_OK !=
4090  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4091  {
4092  GNUNET_break (0);
4094  return;
4095  }
4096 
4097 #if MEASURE_PERFORMANCE
4098  perf_store.offer.received += 1;
4099  perf_store.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct
4101 #endif
4102 
4103  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
4104  / sizeof(struct GNUNET_HashCode);
4105  for (hash = (const struct GNUNET_HashCode *) &mh[1];
4106  num_hashes > 0;
4107  hash++, num_hashes--)
4108  {
4109  struct ElementEntry *ee;
4110  struct GNUNET_MessageHeader *demands;
4111  struct GNUNET_MQ_Envelope *ev;
4112 
4113  ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
4114  hash);
4115  if (NULL != ee)
4117  continue;
4118 
4119  if (GNUNET_YES ==
4120  GNUNET_CONTAINER_multihashmap_contains (op->demanded_hashes,
4121  hash))
4122  {
4124  "Skipped sending duplicate demand\n");
4125  continue;
4126  }
4127 
4130  op->demanded_hashes,
4131  hash,
4132  NULL,
4134 
4136  "[OP %p] Requesting element (hash %s)\n",
4137  op, GNUNET_h2s (hash));
4138 
4139 #if MEASURE_PERFORMANCE
4140  perf_store.demand.sent += 1;
4141  perf_store.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode);
4142 #endif
4143  /* Save send demand message for message control */
4144  if (GNUNET_YES !=
4146  op->message_control_flow,
4147  MSG_CFS_SENT,
4148  hash,
4149  DEMAND_MESSAGE))
4150  {
4152  "Double demand message sent found!\n");
4153  GNUNET_break (0);
4155  return;
4156  }
4157 
4158  /* Mark offer as received received */
4159  if (GNUNET_YES !=
4161  op->message_control_flow,
4163  hash,
4164  OFFER_MESSAGE))
4165  {
4167  "Double offer message received found!\n");
4168  GNUNET_break (0);
4170  return;
4171  }
4172  /* Mark element to be expected to received */
4173  if (GNUNET_YES !=
4175  op->message_control_flow,
4177  hash,
4178  ELEMENT_MESSAGE))
4179  {
4181  "Element already expected!\n");
4182  GNUNET_break (0);
4184  return;
4185  }
4186  ev = GNUNET_MQ_msg_header_extra (demands,
4187  sizeof(struct GNUNET_HashCode),
4189  GNUNET_memcpy (&demands[1],
4190  hash,
4191  sizeof(struct GNUNET_HashCode));
4192  GNUNET_MQ_send (op->mq, ev);
4193  }
4194  GNUNET_CADET_receive_done (op->channel);
4195 }
4196 
4197 
4204 static void
4206  const struct GNUNET_MessageHeader *mh)
4207 {
4208  struct Operation *op = cls;
4209 
4213  uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4214  if (GNUNET_OK !=
4215  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4216  {
4217  GNUNET_break (0);
4219  return;
4220  }
4221 
4222  if (op->active_passive_switch_required)
4223  {
4225  "PROTOCOL VIOLATION: Received done but role change is necessary\n");
4226  GNUNET_break (0);
4228  return;
4229  }
4230 
4231 #if MEASURE_PERFORMANCE
4232  perf_store.done.received += 1;
4233 #endif
4234  switch (op->phase)
4235  {
4237  /* We got all requests, but still have to send our elements in response. */
4238  op->phase = PHASE_FINISH_WAITING;
4240  "got DONE (as passive partner), waiting for our demands to be satisfied\n");
4241  /* The active peer is done sending offers
4242  * and inquiries. This means that all
4243  * our responses to that (demands and offers)
4244  * must be in flight (queued or in mesh).
4245  *
4246  * We should notify the active peer once
4247  * all our demands are satisfied, so that the active
4248  * peer can quit if we gave it everything.
4249  */GNUNET_CADET_receive_done (op->channel);
4250  maybe_finish (op);
4251  return;
4252  case PHASE_ACTIVE_DECODING:
4254  "got DONE (as active partner), waiting to finish\n");
4255  /* All demands of the other peer are satisfied,
4256  * and we processed all offers, thus we know
4257  * exactly what our demands must be.
4258  *
4259  * We'll close the channel
4260  * to the other peer once our demands are met.
4261  */op->phase = PHASE_FINISH_CLOSING;
4262  GNUNET_CADET_receive_done (op->channel);
4263  maybe_finish (op);
4264  return;
4265  default:
4266  GNUNET_break_op (0);
4268  return;
4269  }
4270 }
4271 
4272 
4279 static void
4281  const struct GNUNET_MessageHeader *mh)
4282 {
4283 #if MEASURE_PERFORMANCE
4284  perf_store.over.received += 1;
4285 #endif
4286  send_client_done (cls);
4287 }
4288 
4289 
4298 static struct Operation *
4299 get_incoming (uint32_t id)
4300 {
4301  for (struct Listener *listener = listener_head;
4302  NULL != listener;
4303  listener = listener->next)
4304  {
4305  for (struct Operation *op = listener->op_head;
4306  NULL != op;
4307  op = op->next)
4308  if (op->suggest_id == id)
4309  return op;
4310  }
4311  return NULL;
4312 }
4313 
4314 
4323 static void *
4325  struct GNUNET_SERVICE_Client *c,
4326  struct GNUNET_MQ_Handle *mq)
4327 {
4328  struct ClientState *cs;
4329 
4330  num_clients++;
4331  cs = GNUNET_new (struct ClientState);
4332  cs->client = c;
4333  cs->mq = mq;
4334  return cs;
4335 }
4336 
4337 
4346 static int
4348  const struct GNUNET_HashCode *key,
4349  void *value)
4350 {
4351  struct ElementEntry *ee = value;
4352 
4353  GNUNET_free (ee);
4354  return GNUNET_YES;
4355 }
4356 
4357 
4365 static void
4367  struct GNUNET_SERVICE_Client *client,
4368  void *internal_cls)
4369 {
4370  struct ClientState *cs = internal_cls;
4371  struct Operation *op;
4372  struct Listener *listener;
4373  struct Set *set;
4374 
4376  "Client disconnected, cleaning up\n");
4377  if (NULL != (set = cs->set))
4378  {
4379  struct SetContent *content = set->content;
4380 
4382  "Destroying client's set\n");
4383  /* Destroy pending set operations */
4384  while (NULL != set->ops_head)
4386 
4387  /* Destroy operation-specific state */
4388  if (NULL != set->se)
4389  {
4391  set->se = NULL;
4392  }
4393  /* free set content (or at least decrement RC) */
4394  set->content = NULL;
4395  GNUNET_assert (0 != content->refcount);
4396  content->refcount--;
4397  if (0 == content->refcount)
4398  {
4399  GNUNET_assert (NULL != content->elements);
4402  NULL);
4404  content->elements = NULL;
4405  GNUNET_free (content);
4406  }
4407  GNUNET_free (set);
4408  }
4409 
4410  if (NULL != (listener = cs->listener))
4411  {
4413  "Destroying client's listener\n");
4414  GNUNET_CADET_close_port (listener->open_port);
4415  listener->open_port = NULL;
4416  while (NULL != (op = listener->op_head))
4417  {
4419  "Destroying incoming operation `%u' from peer `%s'\n",
4420  (unsigned int) op->client_request_id,
4421  GNUNET_i2s (&op->peer));
4422  incoming_destroy (op);
4423  }
4425  listener_tail,
4426  listener);
4427  GNUNET_free (listener);
4428  }
4429  GNUNET_free (cs);
4430  num_clients--;
4431  if ( (GNUNET_YES == in_shutdown) &&
4432  (0 == num_clients) )
4433  {
4434  if (NULL != cadet)
4435  {
4437  cadet = NULL;
4438  }
4439  }
4440 }
4441 
4442 
4451 static int
4453  const struct OperationRequestMessage *msg)
4454 {
4455  struct Operation *op = cls;
4456  struct Listener *listener = op->listener;
4457  const struct GNUNET_MessageHeader *nested_context;
4458 
4459  /* double operation request */
4460  if (0 != op->suggest_id)
4461  {
4462  GNUNET_break_op (0);
4463  return GNUNET_SYSERR;
4464  }
4465  /* This should be equivalent to the previous condition, but can't hurt to check twice */
4466  if (NULL == listener)
4467  {
4468  GNUNET_break (0);
4469  return GNUNET_SYSERR;
4470  }
4471  nested_context = GNUNET_MQ_extract_nested_mh (msg);
4472  if ((NULL != nested_context) &&
4473  (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
4474  {
4475  GNUNET_break_op (0);
4476  return GNUNET_SYSERR;
4477  }
4478  return GNUNET_OK;
4479 }
4480 
4481 
4497 static void
4499  const struct OperationRequestMessage *msg)
4500 {
4501  struct Operation *op = cls;
4502  struct Listener *listener = op->listener;
4503  const struct GNUNET_MessageHeader *nested_context;
4504  struct GNUNET_MQ_Envelope *env;
4505  struct GNUNET_SETU_RequestMessage *cmsg;
4506 
4507  nested_context = GNUNET_MQ_extract_nested_mh (msg);
4508  /* Make a copy of the nested_context (application-specific context
4509  information that is opaque to set) so we can pass it to the
4510  listener later on */
4511  if (NULL != nested_context)
4512  op->context_msg = GNUNET_copy_message (nested_context);
4513  op->remote_element_count = ntohl (msg->element_count);
4514  GNUNET_log (
4516  "Received P2P operation request (port %s) for active listener\n",
4517  GNUNET_h2s (&op->listener->app_id));
4518  GNUNET_assert (0 == op->suggest_id);
4519  if (0 == suggest_id)
4520  suggest_id++;
4521  op->suggest_id = suggest_id++;
4522  GNUNET_assert (NULL != op->timeout_task);
4523  GNUNET_SCHEDULER_cancel (op->timeout_task);
4524  op->timeout_task = NULL;
4525  env = GNUNET_MQ_msg_nested_mh (cmsg,
4527  op->context_msg);
4528  GNUNET_log (
4530  "Suggesting incoming request with accept id %u to listener %p of client %p\n",
4531  op->suggest_id,
4532  listener,
4533  listener->cs);
4534  cmsg->accept_id = htonl (op->suggest_id);
4535  cmsg->peer_id = op->peer;
4536  GNUNET_MQ_send (listener->cs->mq,
4537  env);
4538  /* NOTE: GNUNET_CADET_receive_done() will be called in
4539  #handle_client_accept() */
4540 }
4541 
4542 
4551 static void
4553  const struct GNUNET_SETU_CreateMessage *msg)
4554 {
4555  struct ClientState *cs = cls;
4556  struct Set *set;
4557 
4559  "Client created new set for union operation\n");
4560  if (NULL != cs->set)
4561  {
4562  /* There can only be one set per client */
4563  GNUNET_break (0);
4565  return;
4566  }
4567  set = GNUNET_new (struct Set);
4568  {
4569  struct MultiStrataEstimator *se;
4570 
4573  SE_IBF_HASH_NUM);
4574  if (NULL == se)
4575  {
4577  "Failed to allocate strata estimator\n");
4578  GNUNET_free (set);
4580  return;
4581  }
4582  set->se = se;
4583  }
4584  set->content = GNUNET_new (struct SetContent);
4585  set->content->refcount = 1;
4587  GNUNET_YES);
4588  set->cs = cs;
4589  cs->set = set;
4591 }
4592 
4593 
4603 static void
4605 {
4606  struct Operation *op = cls;
4607 
4608  op->timeout_task = NULL;
4610  "Remote peer's incoming request timed out\n");
4611  incoming_destroy (op);
4612 }
4613 
4614 
4631 static void *
4632 channel_new_cb (void *cls,
4633  struct GNUNET_CADET_Channel *channel,
4634  const struct GNUNET_PeerIdentity *source)
4635 {
4636  struct Listener *listener = cls;
4637  struct Operation *op;
4638 
4640  "New incoming channel\n");
4641  op = GNUNET_new (struct Operation);
4642  op->listener = listener;
4643  op->peer = *source;
4644  op->channel = channel;
4645  op->mq = GNUNET_CADET_get_mq (op->channel);
4647  UINT32_MAX);
4650  op);
4652  listener->op_tail,
4653  op);
4654  return op;
4655 }
4656 
4657 
4674 static void
4675 channel_end_cb (void *channel_ctx,
4676  const struct GNUNET_CADET_Channel *channel)
4677 {
4678  struct Operation *op = channel_ctx;
4679 
4680  op->channel = NULL;
4682 }
4683 
4684 
4699 static void
4701  const struct GNUNET_CADET_Channel *channel,
4702  int window_size)
4703 {
4704  /* FIXME: not implemented, we could do flow control here... */
4705 }
4706 
4707 
4715 static void
4717  const struct GNUNET_SETU_ListenMessage *msg)
4718 {
4719  struct ClientState *cs = cls;
4720  struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4721  GNUNET_MQ_hd_var_size (incoming_msg,
4723  struct OperationRequestMessage,
4724  NULL),
4725  GNUNET_MQ_hd_var_size (union_p2p_ibf,
4727  struct IBFMessage,
4728  NULL),
4729  GNUNET_MQ_hd_var_size (union_p2p_elements,
4732  NULL),
4733  GNUNET_MQ_hd_var_size (union_p2p_offer,
4735  struct GNUNET_MessageHeader,
4736  NULL),
4737  GNUNET_MQ_hd_var_size (union_p2p_inquiry,
4739  struct InquiryMessage,
4740  NULL),
4741  GNUNET_MQ_hd_var_size (union_p2p_demand,
4743  struct GNUNET_MessageHeader,
4744  NULL),
4745  GNUNET_MQ_hd_fixed_size (union_p2p_done,
4747  struct GNUNET_MessageHeader,
4748  NULL),
4749  GNUNET_MQ_hd_fixed_size (union_p2p_over,
4751  struct GNUNET_MessageHeader,
4752  NULL),
4753  GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
4755  struct GNUNET_MessageHeader,
4756  NULL),
4757  GNUNET_MQ_hd_var_size (union_p2p_request_full,
4759  struct TransmitFullMessage,
4760  NULL),
4761  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4763  struct StrataEstimatorMessage,
4764  NULL),
4765  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4767  struct StrataEstimatorMessage,
4768  NULL),
4769  GNUNET_MQ_hd_var_size (union_p2p_full_element,
4772  NULL),
4773  GNUNET_MQ_hd_var_size (union_p2p_send_full,
4775  struct TransmitFullMessage,
4776  NULL),
4778  };
4779  struct Listener *listener;
4780 
4781  if (NULL != cs->listener)
4782  {
4783  /* max. one active listener per client! */
4784  GNUNET_break (0);
4786  return;
4787  }
4788  listener = GNUNET_new (struct Listener);
4789  listener->cs = cs;
4790  cs->listener = listener;
4791  listener->app_id = msg->app_id;
4793  listener_tail,
4794  listener);
4796  "New listener created (port %s)\n",
4797  GNUNET_h2s (&listener->app_id));
4798  listener->open_port = GNUNET_CADET_open_port (cadet,
4799  &msg->app_id,
4800  &channel_new_cb,
4801  listener,
4803  &channel_end_cb,
4804  cadet_handlers);
4806 }
4807 
4808 
4816 static void
4818  const struct GNUNET_SETU_RejectMessage *msg)
4819 {
4820  struct ClientState *cs = cls;
4821  struct Operation *op;
4822 
4823  op = get_incoming (ntohl (msg->accept_reject_id));
4824  if (NULL == op)
4825  {
4826  /* no matching incoming operation for this reject;
4827  could be that the other peer already disconnected... */
4829  "Client rejected unknown operation %u\n",
4830  (unsigned int) ntohl (msg->accept_reject_id));
4832  return;
4833  }
4835  "Peer request (app %s) rejected by client\n",
4836  GNUNET_h2s (&cs->listener->app_id));
4839 }
4840 
4841 
4848 static int
4850  const struct GNUNET_SETU_ElementMessage *msg)
4851 {
4852  /* NOTE: Technically, we should probably check with the
4853  block library whether the element we are given is well-formed */
4854  return GNUNET_OK;
4855 }
4856 
4857 
4864 static void
4866  const struct GNUNET_SETU_ElementMessage *msg)
4867 {
4868  struct ClientState *cs = cls;
4869  struct Set *set;
4870  struct GNUNET_SETU_Element el;
4871  struct ElementEntry *ee;
4872  struct GNUNET_HashCode hash;
4873 
4874  if (NULL == (set = cs->set))
4875  {
4876  /* client without a set requested an operation */
4877  GNUNET_break (0);
4879  return;
4880  }
4882  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
4883  el.size = ntohs (msg->header.size) - sizeof(*msg);
4884  el.data = &msg[1];
4885  el.element_type = ntohs (msg->element_type);
4887  &hash);
4889  &hash);
4890  if (NULL == ee)
4891  {
4893  "Client inserts element %s of size %u\n",
4894  GNUNET_h2s (&hash),
4895  el.size);
4896  ee = GNUNET_malloc (el.size + sizeof(*ee));
4897  ee->element.size = el.size;
4898  GNUNET_memcpy (&ee[1], el.data, el.size);
4899  ee->element.data = &ee[1];
4900  ee->element.element_type = el.element_type;
4901  ee->remote = GNUNET_NO;
4902  ee->generation = set->current_generation;
4903  ee->element_hash = hash;
4906  set->content->elements,
4907  &ee->element_hash,
4908  ee,
4910  }
4911  else
4912  {
4914  "Client inserted element %s of size %u twice (ignored)\n",
4915  GNUNET_h2s (&hash),
4916  el.size);
4917  /* same element inserted twice */
4918  return;
4919  }
4921  get_ibf_key (&ee->element_hash));
4922 }
4923 
4924 
4931 static void
4933 {
4934  set->content->latest_generation++;
4935  set->current_generation++;
4936 }
4937 
4938 
4948 static int
4950  const struct GNUNET_SETU_EvaluateMessage *msg)
4951 {
4952  /* FIXME: suboptimal, even if the context below could be NULL,
4953  there are malformed messages this does not check for... */
4954  return GNUNET_OK;
4955 }
4956 
4957 
4966 static void
4968  const struct GNUNET_SETU_EvaluateMessage *msg)
4969 {
4970  struct ClientState *cs = cls;
4971  struct Operation *op = GNUNET_new (struct Operation);
4972 
4973  const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4974  GNUNET_MQ_hd_var_size (incoming_msg,
4976  struct OperationRequestMessage,
4977  op),
4978  GNUNET_MQ_hd_var_size (union_p2p_ibf,
4980  struct IBFMessage,
4981  op),
4982  GNUNET_MQ_hd_var_size (union_p2p_elements,
4985  op),
4986  GNUNET_MQ_hd_var_size (union_p2p_offer,
4988  struct GNUNET_MessageHeader,
4989  op),
4990  GNUNET_MQ_hd_var_size (union_p2p_inquiry,
4992  struct InquiryMessage,
4993  op),
4994  GNUNET_MQ_hd_var_size (union_p2p_demand,
4996  struct GNUNET_MessageHeader,
4997  op),
4998  GNUNET_MQ_hd_fixed_size (union_p2p_done,
5000  struct GNUNET_MessageHeader,
5001  op),
5002  GNUNET_MQ_hd_fixed_size (union_p2p_over,
5004  struct GNUNET_MessageHeader,
5005  op),
5006  GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
5008  struct GNUNET_MessageHeader,
5009  op),
5010  GNUNET_MQ_hd_var_size (union_p2p_request_full,
5012  struct TransmitFullMessage,
5013  op),
5014  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
5016  struct StrataEstimatorMessage,
5017  op),
5018  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
5020  struct StrataEstimatorMessage,
5021  op),
5022  GNUNET_MQ_hd_var_size (union_p2p_full_element,
5025  op),
5026  GNUNET_MQ_hd_var_size (union_p2p_send_full,
5028  struct TransmitFullMessage,
5029  NULL),
5031  };
5032  struct Set *set;
5033  const struct GNUNET_MessageHeader *context;
5034 
5035  if (NULL == (set = cs->set))
5036  {
5037  GNUNET_break (0);
5038  GNUNET_free (op);
5040  return;
5041  }
5043  UINT32_MAX);
5044  op->peer = msg->target_peer;
5045  op->client_request_id = ntohl (msg->request_id);
5046  op->byzantine = msg->byzantine;
5047  op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
5048  op->force_full = msg->force_full;
5049  op->force_delta = msg->force_delta;
5050  op->symmetric = msg->symmetric;
5051  op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5052  op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5053  op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5054  op->byzantine_upper_bound = msg->byzantine_upper_bond;
5055  op->active_passive_switch_required = false;
5057 
5058  /* create hashmap for message control */
5059  op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5060  GNUNET_NO);
5061  op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5062 
5063 #if MEASURE_PERFORMANCE
5064  /* load config */
5065  load_config (op);
5066 #endif
5067 
5068  /* Advance generation values, so that
5069  mutations won't interfere with the running operation. */
5070  op->set = set;
5071  op->generation_created = set->current_generation;
5072  advance_generation (set);
5074  set->ops_tail,
5075  op);
5077  "Creating new CADET channel to port %s for set union\n",
5078  GNUNET_h2s (&msg->app_id));
5079  op->channel = GNUNET_CADET_channel_create (cadet,
5080  op,
5081  &msg->target_peer,
5082  &msg->app_id,
5084  &channel_end_cb,
5085  cadet_handlers);
5086  op->mq = GNUNET_CADET_get_mq (op->channel);
5087  {
5088  struct GNUNET_MQ_Envelope *ev;
5089  struct OperationRequestMessage *msg;
5090 
5091 #if MEASURE_PERFORMANCE
5092  perf_store.operation_request.sent += 1;
5093 #endif
5096  context);
5097  if (NULL == ev)
5098  {
5099  /* the context message is too large */
5100  GNUNET_break (0);
5102  return;
5103  }
5104  op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
5105  GNUNET_NO);
5106  /* copy the current generation's strata estimator for this operation */
5107  op->se = strata_estimator_dup (op->set->se);
5108  /* we started the operation, thus we have to send the operation request */
5109  op->phase = PHASE_EXPECT_SE;
5110 
5111  op->salt_receive = (op->peer_site + 1) % 2;
5112  op->salt_send = op->peer_site; // FIXME?????
5113 
5114 
5116  "Initiating union operation evaluation\n");
5118  "# of total union operations",
5119  1,
5120  GNUNET_NO);
5122  "# of initiated union operations",
5123  1,
5124  GNUNET_NO);
5125  GNUNET_MQ_send (op->mq,
5126  ev);
5127  if (NULL != context)
5129  "sent op request with context message\n");
5130  else
5132  "sent op request without context message\n");
5134  op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
5135  op->key_to_element);
5136 
5137  }
5139 }
5140 
5141 
5148 static void
5150  const struct GNUNET_SETU_CancelMessage *msg)
5151 {
5152  struct ClientState *cs = cls;
5153  struct Set *set;
5154  struct Operation *op;
5155  int found;
5156 
5157  if (NULL == (set = cs->set))
5158  {
5159  /* client without a set requested an operation */
5160  GNUNET_break (0);
5162  return;
5163  }
5164  found = GNUNET_NO;
5165  for (op = set->ops_head; NULL != op; op = op->next)
5166  {
5167  if (op->client_request_id == ntohl (msg->request_id))
5168  {
5169  found = GNUNET_YES;
5170  break;
5171  }
5172  }
5173  if (GNUNET_NO == found)
5174  {
5175  /* It may happen that the operation was already destroyed due to
5176  * the other peer disconnecting. The client may not know about this
5177  * yet and try to cancel the (just barely non-existent) operation.
5178  * So this is not a hard error.
5179  *///
5181  "Client canceled non-existent op %u\n",
5182  (uint32_t) ntohl (msg->request_id));
5183  }
5184  else
5185  {
5187  "Client requested cancel for op %u\n",
5188  (uint32_t) ntohl (msg->request_id));
5190  }
5192 }
5193 
5194 
5203 static void
5205  const struct GNUNET_SETU_AcceptMessage *msg)
5206 {
5207  struct ClientState *cs = cls;
5208  struct Set *set;
5209  struct Operation *op;
5210  struct GNUNET_SETU_ResultMessage *result_message;
5211  struct GNUNET_MQ_Envelope *ev;
5212  struct Listener *listener;
5213 
5214  if (NULL == (set = cs->set))
5215  {
5216  /* client without a set requested to accept */
5217  GNUNET_break (0);
5219  return;
5220  }
5221  op = get_incoming (ntohl (msg->accept_reject_id));
5222  if (NULL == op)
5223  {
5224  /* It is not an error if the set op does not exist -- it may
5225  * have been destroyed when the partner peer disconnected. */
5226  GNUNET_log (
5228  "Client %p accepted request %u of listener %p that is no longer active\n",
5229  cs,
5230  ntohl (msg->accept_reject_id),
5231  cs->listener);
5232  ev = GNUNET_MQ_msg (result_message,
5234  result_message->request_id = msg->request_id;
5235  result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
5236  GNUNET_MQ_send (set->cs->mq, ev);
5238  return;
5239  }
5241  "Client accepting request %u\n",
5242  (uint32_t) ntohl (msg->accept_reject_id));
5243  listener = op->listener;
5244  op->listener = NULL;
5246  listener->op_tail,
5247  op);
5248  op->set = set;
5250  set->ops_tail,
5251  op);
5252  op->client_request_id = ntohl (msg->request_id);
5253  op->byzantine = msg->byzantine;
5254  op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
5255  op->force_full = msg->force_full;
5256  op->force_delta = msg->force_delta;
5257  op->symmetric = msg->symmetric;
5258  op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5259  op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5260  op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5261  op->byzantine_upper_bound = msg->byzantine_upper_bond;
5262  op->active_passive_switch_required = false;
5263  /* create hashmap for message control */
5264  op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5265  GNUNET_NO);
5266  op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5267 
5268 #if MEASURE_PERFORMANCE
5269  /* load config */
5270  load_config (op);
5271 #endif
5272 
5273  /* Advance generation values, so that future mutations do not
5274  interfer with the running operation. */
5275  op->generation_created = set->current_generation;
5276  advance_generation (set);
5277  GNUNET_assert (NULL == op->se);
5278 
5280  "accepting set union operation\n");
5282  "# of accepted union operations",
5283  1,
5284  GNUNET_NO);
5286  "# of total union operations",
5287  1,
5288  GNUNET_NO);
5289  {
5290  struct MultiStrataEstimator *se;
5291  struct GNUNET_MQ_Envelope *ev;
5292  struct StrataEstimatorMessage *strata_msg;
5293  char *buf;
5294  size_t len;
5295  uint16_t type;
5296 
5297  op->se = strata_estimator_dup (op->set->se);
5298  op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
5299  GNUNET_NO);
5300  op->salt_receive = (op->peer_site + 1) % 2;
5301  op->salt_send = op->peer_site; // FIXME?????
5303  op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
5304  op->key_to_element);
5305 
5306  /* kick off the operation */
5307  se = op->se;
5308 
5309  uint8_t se_count = 1;
5310  if (op->initial_size > 0)
5311  {
5312  op->total_elements_size_local = 0;
5313  GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
5314  &
5316  op);
5318  op->total_elements_size_local / op->initial_size,
5319  op->initial_size);
5320  }
5322  * ((SE_IBFS_TOTAL_SIZE / 8) * se_count));
5325  se_count,
5326  buf);
5327 #if MEASURE_PERFORMANCE
5328  perf_store.se.sent += 1;
5329  perf_store.se.sent_var_bytes += len;
5330 #endif
5331 
5332  if (len < se->stratas[0]->strata_count * IBF_BUCKET_SIZE
5335  else
5337  ev = GNUNET_MQ_msg_extra (strata_msg,
5338  len,
5339  type);
5340  GNUNET_memcpy (&strata_msg[1],
5341  buf,
5342  len);
5343  GNUNET_free (buf);
5344  strata_msg->set_size
5346  op->set->content->elements));
5347  strata_msg->se_count = se_count;
5348  GNUNET_MQ_send (op->mq,
5349  ev);
5350  op->phase = PHASE_EXPECT_IBF;
5351  }
5352  /* Now allow CADET to continue, as we did not do this in
5353  #handle_incoming_msg (as we wanted to first see if the
5354  local client would accept the request). */
5355  GNUNET_CADET_receive_done (op->channel);
5357 }
5358 
5359 
5365 static void
5366 shutdown_task (void *cls)
5367 {
5368  /* Delay actual shutdown to allow service to disconnect clients */
5370  if (0 == num_clients)
5371  {
5372  if (NULL != cadet)
5373  {
5375  cadet = NULL;
5376  }
5377  }
5379  GNUNET_YES);
5381  "handled shutdown request\n");
5382 #if MEASURE_PERFORMANCE
5383  calculate_perf_store ();
5384 #endif
5385 }
5386 
5387 
5396 static void
5397 run (void *cls,
5398  const struct GNUNET_CONFIGURATION_Handle *cfg,
5400 {
5401  /* FIXME: need to modify SERVICE (!) API to allow
5402  us to run a shutdown task *after* clients were
5403  forcefully disconnected! */
5405  NULL);
5407  cfg);
5409  if (NULL == cadet)
5410  {
5412  _ ("Could not connect to CADET service\n"));
5414  return;
5415  }
5416 }
5417 
5418 
5423  "set",
5425  &run,
5428  NULL,
5429  GNUNET_MQ_hd_fixed_size (client_accept,
5432  NULL),
5433  GNUNET_MQ_hd_var_size (client_set_add,
5436  NULL),
5437  GNUNET_MQ_hd_fixed_size (client_create_set,
5440  NULL),
5441  GNUNET_MQ_hd_var_size (client_evaluate,
5444  NULL),
5445  GNUNET_MQ_hd_fixed_size (client_listen,
5448  NULL),
5449  GNUNET_MQ_hd_fixed_size (client_reject,
5452  NULL),
5453  GNUNET_MQ_hd_fixed_size (client_cancel,
5456  NULL),
5458 
5459 
5460 /* end of gnunet-service-setu.c */
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
char * getenv()
static const struct GNUNET_CONFIGURATION_Handle * cfg
Configuration we are using.
Definition: gnunet-abd.c:36
static int ret
Return value of the commandline.
Definition: gnunet-abd.c:81
static struct GNUNET_IDENTITY_EgoLookup * el
EgoLookup.
Definition: gnunet-abd.c:51
static struct GNUNET_ARM_Operation * op
Current operation.
Definition: gnunet-arm.c:144
static unsigned int phase
Processing stage that we are in.
Definition: gnunet-arm.c:114
static int res
static void done()
static struct GNUNET_CADET_Handle * mh
Cadet handle.
Definition: gnunet-cadet.c:92
struct GNUNET_HashCode key
The key used in the DHT.
static GstElement * source
Appsrc instance into which we write data for the pipeline.
static pa_context * context
Pulseaudio context.
uint16_t status
See PRISM_STATUS_*-constants.
uint16_t len
length of data (which is always a uint32_t, but presumably this can be used to specify that fewer byt...
static char * value
Value of the record to add/remove.
static struct GNUNET_SERVICE_Handle * service
Handle to our service instance.
static struct GNUNET_CRYPTO_PowSalt salt
Salt for PoW calcualations.
UnionOperationPhase
Current phase we are in for a union operation.
unsigned int strata_estimator_difference(const struct StrataEstimator *se1, const struct StrataEstimator *se2)
Estimate set difference with two strata estimators, i.e.
struct StrataEstimator * strata_estimator_create(unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum)
Create a new strata estimator with the given parameters.
void strata_estimator_destroy(struct StrataEstimator *se)
Destroy a strata estimator, free all of its resources.
int strata_estimator_read(const void *buf, size_t buf_len, int is_compressed, struct StrataEstimator *se)
Read strata from the buffer into the given strata estimator.
size_t strata_estimator_write(const struct StrataEstimator *se, void *buf)
Write the given strata estimator to the buffer.
void strata_estimator_insert(struct StrataEstimator *se, struct IBF_Key key)
Add a key to the strata estimator.
struct StrataEstimator * strata_estimator_dup(struct StrataEstimator *se)
Make a copy of a strata estimator.
static void _GSS_operation_destroy(struct Operation *op)
Destroy the given operation.
#define IBF_MIN_SIZE
Minimal size of an ibf Based on the bsc thesis of Elias Summermatter (2021)
static void _GSS_operation_destroy2(struct Operation *op)
This function probably should not exist and be replaced by inlining more specific logic in the variou...
static void handle_client_create_set(void *cls, const struct GNUNET_SETU_CreateMessage *msg)
Called when a client wants to create a new set.
static int _GSS_is_element_of_operation(struct ElementEntry *ee, struct Operation *op)
Is element ee part of the set used by op?
static int check_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
static struct KeyEntry * op_get_element(struct Operation *op, const struct GNUNET_HashCode *element_hash)
Determine whether the given element is already in the operation's element set.
static unsigned int get_next_ibf_size(float ibf_bucket_number_factor, unsigned int decoded_elements, unsigned int last_ibf_size)
#define MAX_IBF_SIZE
The maximum size of an ibf we use is MAX_IBF_SIZE=2^20.
#define MAX_BUCKETS_PER_MESSAGE
Number of buckets that can be transmitted in one message.
static uint32_t suggest_id
Counter for allocating unique IDs for clients, used to identify incoming operation requests from remo...
static void handle_union_p2p_request_full(void *cls, const struct TransmitFullMessage *msg)
static int check_union_p2p_request_full(void *cls, const struct TransmitFullMessage *mh)
Handle a request for full set transmission.
static int create_randomized_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Create randomized element hashmap for full sending.
static int decode_and_send(struct Operation *op)
Decode which elements are missing on each side, and send the appropriate offers and inquiries.
static int send_full_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Send a set element.
MESSAGE_CONTROL_FLOW_STATE
Different states to control the messages flow in differential mode.
@ MSG_CFS_EXPECTED
Track that receiving this message is expected.
@ MSG_CFS_SENT
Track that a message has been sent.
@ MSG_CFS_UNINITIALIZED
Initial message state.
@ MSG_CFS_RECEIVED
Track that message has been received.
static struct Listener * listener_head
Listeners are held in a doubly linked list.
static void send_full_set(struct Operation *op)
Switch to full set transmission for op.
#define PROBABILITY_FOR_NEW_ROUND
Is the estimated probability for a new round this values is based on the bsc thesis of Elias Summerma...
static int check_byzantine_bounds(struct Operation *op)
Check if all given byzantine parameters are in given boundaries.
static void handle_client_accept(void *cls, const struct GNUNET_SETU_AcceptMessage *msg)
Handle a request from the client to accept a set operation that came from a remote peer.
static void handle_union_p2p_over(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a over message from a remote peer.
static void handle_client_cancel(void *cls, const struct GNUNET_SETU_CancelMessage *msg)
Handle a request from the client to cancel a running set operation.
static void handle_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
static void handle_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
static void handle_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
static int check_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Check an IBF message from a remote peer.
static struct IBF_Key get_ibf_key(const struct GNUNET_HashCode *src)
Derive the IBF key from a hash code and a salt.
static struct GNUNET_CADET_Handle * cadet
Handle to the cadet service, used to listen for and connect to remote peers.
static void maybe_finish(struct Operation *op)
Tests if the operation is finished, and if so notify.
static void handle_client_reject(void *cls, const struct GNUNET_SETU_RejectMessage *msg)
Called when the listening client rejects an operation request by another peer.
static int init_key_to_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for initializing the key-to-element mapping of a union operation.
static int send_offers_iterator(void *cls, uint32_t key, void *value)
Iterator to send elements to a remote peer.
static void handle_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
static void * client_connect_cb(void *cls, struct GNUNET_SERVICE_Client *c, struct GNUNET_MQ_Handle *mq)
Callback called when a client connects to the service.
static int check_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
static void * channel_new_cb(void *cls, struct GNUNET_CADET_Channel *channel, const struct GNUNET_PeerIdentity *source)
Method called whenever another peer has added us to a channel the other peer initiated.
static int check_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
static int send_missing_full_elements_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries.
static unsigned int get_size_from_difference(unsigned int diff, int number_buckets_per_element, float ibf_bucket_number_factor)
Compute the necessary order of an ibf from the size of the symmetric set difference.
static void handle_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Handle a request for a set operation from another peer.
static int in_shutdown
Are we in shutdown? if GNUNET_YES and the number of clients drops to zero, disconnect from CADET.
static int destroy_elements_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator over hash map entries to free element entries.
void send_offers_for_key(struct Operation *op, struct IBF_Key ibf_key)
Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
static void shutdown_task(void *cls)
Called to clean up, after a shutdown has been requested.
static void handle_union_p2p_send_full(void *cls, const struct TransmitFullMessage *msg)
Handle send full message received from other peer.
static void handle_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
static struct GNUNET_STATISTICS_Handle * _GSS_statistics
Statistics handle.
static unsigned int num_clients
Number of active clients.
static void channel_window_cb(void *cls, const struct GNUNET_CADET_Channel *channel, int window_size)
Function called whenever an MQ-channel's transmission window size changes.
static void advance_generation(struct Set *set)
Advance the current generation of a set, adding exclusion ranges if necessary.
static int update_message_control_flow(struct GNUNET_CONTAINER_MultiHashMap *hash_map, enum MESSAGE_CONTROL_FLOW_STATE new_mcfs, const struct GNUNET_HashCode *hash_code, enum MESSAGE_TYPE mt)
Function to update, track and validate message received in differential sync.
static void check_max_differential_rounds(struct Operation *op)
Limit active passive switches in differential sync to configured security level.
static void handle_union_p2p_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a done message from a remote peer.
static int check_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Check a request for a set operation from another peer.
static void fail_union_operation(struct Operation *op)
Inform the client that the union operation has failed, and proceed to destroy the evaluate operation.
static int check_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Check offer (of struct GNUNET_HashCodes).
static void send_client_done(void *cls)
Signal to the client that the operation has finished and destroy the operation.
static void unsalt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
Reverse modification done in the salt_key function.
static void handle_union_p2p_full_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a "full done" message.
static void salt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
Modify an IBF key k_in based on the salt, returning a salted key in k_out.
static int prepare_ibf(struct Operation *op, uint32_t size)
Create an ibf with the operation's elements of the specified size.
static void op_register_element(struct Operation *op, struct ElementEntry *ee, int received)
Insert an element into the union operation's key-to-element mapping.
static void incoming_destroy(struct Operation *op)
Destroy an incoming request from a remote peer.
static struct Listener * listener_tail
Listeners are held in a doubly linked list.
static int determinate_avg_element_size_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for determining average size.
static int check_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check a full element message from a remote peer.
MESSAGE_TYPE
Message types to track in message control flow.
@ DEMAND_MESSAGE
Demand message type.
@ OFFER_MESSAGE
Offer message type.
@ ELEMENT_MESSAGE
Element message type.
static enum GNUNET_GenericReturnValue check_valid_phase(const uint8_t allowed_phases[], size_t size_phases, struct Operation *op)
Validates the if a message is received in a correct phase.
GNUNET_SERVICE_MAIN("set", GNUNET_SERVICE_OPTION_NONE, &run, &client_connect_cb, &client_disconnect_cb, NULL, GNUNET_MQ_hd_fixed_size(client_accept, GNUNET_MESSAGE_TYPE_SETU_ACCEPT, struct GNUNET_SETU_AcceptMessage, NULL), GNUNET_MQ_hd_var_size(client_set_add, GNUNET_MESSAGE_TYPE_SETU_ADD, struct GNUNET_SETU_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size(client_create_set, GNUNET_MESSAGE_TYPE_SETU_CREATE, struct GNUNET_SETU_CreateMessage, NULL), GNUNET_MQ_hd_var_size(client_evaluate, GNUNET_MESSAGE_TYPE_SETU_EVALUATE, struct GNUNET_SETU_EvaluateMessage, NULL), GNUNET_MQ_hd_fixed_size(client_listen, GNUNET_MESSAGE_TYPE_SETU_LISTEN, struct GNUNET_SETU_ListenMessage, NULL), GNUNET_MQ_hd_fixed_size(client_reject, GNUNET_MESSAGE_TYPE_SETU_REJECT, struct GNUNET_SETU_RejectMessage, NULL), GNUNET_MQ_hd_fixed_size(client_cancel, GNUNET_MESSAGE_TYPE_SETU_CANCEL, struct GNUNET_SETU_CancelMessage, NULL), GNUNET_MQ_handler_end())
Define "main" method using service macro.
static void handle_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Handle offers (of struct GNUNET_HashCodes) and respond with demands (of struct GNUNET_HashCodes).
static void initialize_key_to_element(struct Operation *op)
Initialize the IBF key to element mapping local to this set operation.
static int check_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Check a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
static enum GNUNET_GenericReturnValue free_values_iter(void *cls, const struct GNUNET_HashCode *key, void *value)
static struct Operation * get_incoming(uint32_t id)
Get the incoming socket associated with the given id.
static int destroy_key_to_element_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries.
static void handle_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Handle an IBF message from a remote peer.
static int check_union_p2p_send_full(void *cls, const struct TransmitFullMessage *msg)
Check send full message received from other peer.
static void handle_client_listen(void *cls, const struct GNUNET_SETU_ListenMessage *msg)
Called when a client wants to create a new listener.
#define SE_STRATA_COUNT
Number of IBFs in a strata estimator.
static void full_sync_plausibility_check(struct Operation *op)
Function that checks if full sync is plausible.
static void handle_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
#define INCOMING_CHANNEL_TIMEOUT
How long do we hold on to an incoming channel if there is no local listener before giving up?
static void handle_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
#define LOG(kind,...)
static int is_message_in_message_control_flow(struct GNUNET_CONTAINER_MultiHashMap *hash_map, struct GNUNET_HashCode *hash_code, enum MESSAGE_TYPE mt)
Validate if a message in differential sync si already received before.
#define SECURITY_LEVEL
Security level used for byzantine checks (2^80)
static int send_ibf(struct Operation *op, uint32_t ibf_size)
Send an ibf of appropriate size.
MODE_OF_OPERATION
Different modes of operations.
@ DIFFERENTIAL_SYNC
Mode just synchronizes the difference between sets.
@ FULL_SYNC_LOCAL_SENDING_FIRST
Mode send full set sending local set first.
@ FULL_SYNC_REMOTE_SENDING_FIRST
Mode request full set from remote peer.
static void client_disconnect_cb(void *cls, struct GNUNET_SERVICE_Client *client, void *internal_cls)
Clean up after a client has disconnected.
@ PHASE_FINISH_CLOSING
The protocol is almost finished, but we still have to flush our message queue and/or expect some elem...
@ PHASE_EXPECT_SE
We sent the request message, and expect a strata estimator.
@ PHASE_EXPECT_IBF_LAST
Continuation for multi part IBFs.
@ PHASE_FULL_RECEIVING
Phase that receives full set first and then sends elements that are the local peer missing.
@ PHASE_FINISH_WAITING
In the penultimate phase, we wait until all our demands are satisfied.
@ PHASE_FINISHED
In the ultimate phase, we wait until our demands are satisfied and then quit (sending another DONE me...
@ PHASE_PASSIVE_DECODING
The other peer is decoding the IBF we just sent.
@ PHASE_ACTIVE_DECODING
We are decoding an IBF.
@ PHASE_FULL_SENDING
After sending the full set, wait for responses with the elements that the local peer is missing.
@ PHASE_EXPECT_IBF
We sent the strata estimator, and expect an IBF.
static uint8_t estimate_best_mode_of_operation(uint64_t avg_element_size, uint64_t local_set_size, uint64_t remote_set_size, uint64_t est_set_diff_remote, uint64_t est_set_diff_local, uint64_t bandwith_latency_tradeoff, uint64_t ibf_bucket_number_factor)
Function that chooses the optimal mode of operation depending on operation parameters.
static void channel_end_cb(void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
Function called whenever a channel is destroyed.
static void send_client_element(struct Operation *op, const struct GNUNET_SETU_Element *element, enum GNUNET_SETU_Status status)
Send a result message to the client indicating that there is a new element.
static int determinate_done_message_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for determining if all demands have been satisfied.
#define SE_IBF_HASH_NUM
The hash num parameter for the difference digests and strata estimators.
static void incoming_timeout_cb(void *cls)
Timeout happens iff:
#define SE_IBFS_TOTAL_SIZE
Primes for all 4 different strata estimators 61,67,71,73,79,83,89,97 348 Based on the bsc thesis of E...
#define DIFFERENTIAL_RTT_MEAN
AVG RTT for differential sync when k=2 and Factor = 2 Based on the bsc thesis of Elias Summermatter (...
static int op_get_element_iterator(void *cls, uint32_t key, void *value)
Iterator over the mapping from IBF keys to element entries.
static int prepare_ibf_iterator(void *cls, uint32_t key, void *value)
Insert a key into an ibf.
static int check_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check an element message from a remote peer.
static void run(void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_SERVICE_Handle *service)
Function called by the service's run method to run service-specific setup code.
static int check_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
uint8_t determine_strata_count(uint64_t avg_element_size, uint64_t element_count)
Calculates the optimal number of strata Estimators to send.
static char buf[2048]
static unsigned int ibf_size
static unsigned int element_size
static struct GNUNET_DNSSTUB_Context * ctx
Context for DNS resolution.
Constants for network applications operating on top of the CADET service.
CADET service; establish channels to distant peers.
Constants for network protocols.
Two-peer set union operations.
API to create, modify and access statistics.
struct GNUNET_CADET_Channel * GNUNET_CADET_channel_create(struct GNUNET_CADET_Handle *h, void *channel_cls, const struct GNUNET_PeerIdentity *destination, const struct GNUNET_HashCode *port, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Create a new channel towards a remote peer.
Definition: cadet_api.c:1015
void GNUNET_CADET_receive_done(struct GNUNET_CADET_Channel *channel)
Send an ack on the channel to confirm the processing of a message.
Definition: cadet_api.c:872
void GNUNET_CADET_channel_destroy(struct GNUNET_CADET_Channel *channel)
Destroy an existing channel.
Definition: cadet_api.c:830
void GNUNET_CADET_disconnect(struct GNUNET_CADET_Handle *handle)
Disconnect from the cadet service.
Definition: cadet_api.c:774
void GNUNET_CADET_close_port(struct GNUNET_CADET_Port *p)
Close a port opened with GNUNET_CADET_open_port.
Definition: cadet_api.c:801
struct GNUNET_MQ_Handle * GNUNET_CADET_get_mq(const struct GNUNET_CADET_Channel *channel)
Obtain the message queue for a connected peer.
Definition: cadet_api.c:1066
struct GNUNET_CADET_Handle * GNUNET_CADET_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the MQ-based cadet service.
Definition: cadet_api.c:894
struct GNUNET_CADET_Port * GNUNET_CADET_open_port(struct GNUNET_CADET_Handle *h, const struct GNUNET_HashCode *port, GNUNET_CADET_ConnectEventHandler connects, void *connects_cls, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Open a port to receive incoming MQ-based channels.
Definition: cadet_api.c:954
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_get_value_number(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, unsigned long long *number)
Get a configuration value that should be a number.
struct GNUNET_CONFIGURATION_Handle * GNUNET_CONFIGURATION_create(void)
Create a new configuration object.
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_load(struct GNUNET_CONFIGURATION_Handle *cfg, const char *filename)
Load configuration.
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_get_value_float(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, float *number)
Get a configuration value that should be a floating point number.
uint64_t GNUNET_CRYPTO_random_u64(enum GNUNET_CRYPTO_Quality mode, uint64_t max)
Generate a random unsigned 64-bit value.
uint32_t GNUNET_CRYPTO_random_u32(enum GNUNET_CRYPTO_Quality mode, uint32_t i)
Produce a random value.
@ GNUNET_CRYPTO_QUALITY_NONCE
Randomness for IVs etc.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
int GNUNET_CRYPTO_hash_cmp(const struct GNUNET_HashCode *h1, const struct GNUNET_HashCode *h2)
Compare function for HashCodes, producing a total ordering of all hashcodes.
Definition: crypto_hash.c:220
enum GNUNET_GenericReturnValue GNUNET_CRYPTO_kdf(void *result, size_t out_len, const void *xts, size_t xts_len, const void *skm, size_t skm_len,...)
Derive key.
Definition: crypto_kdf.c:70
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_contains(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Check if the map contains any value under the given key (including values that are NULL).
int GNUNET_CONTAINER_multihashmap32_get_multiple(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, GNUNET_CONTAINER_MultiHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map that match a particular key.
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MultiHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap32_put(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_remove(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, const void *value)
Remove the given key-value pair from the map.
unsigned int GNUNET_CONTAINER_multihashmap32_size(const struct GNUNET_CONTAINER_MultiHashMap32 *map)
Get the number of key-value pairs in the map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
unsigned int GNUNET_CONTAINER_multihashmap_size(const struct GNUNET_CONTAINER_MultiHashMap *map)
Get the number of key-value pairs in the map.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
struct GNUNET_CONTAINER_MultiHashMap32 * GNUNET_CONTAINER_multihashmap32_create(unsigned int len)
Create a 32-bit key multi hash map.
void GNUNET_CONTAINER_multihashmap32_destroy(struct GNUNET_CONTAINER_MultiHashMap32 *map)
Destroy a 32-bit key hash map.
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
int GNUNET_CONTAINER_multihashmap32_iterate(struct GNUNET_CONTAINER_MultiHashMap32 *map, GNUNET_CONTAINER_MultiHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST
, ' bother checking if a value already exists (faster than GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE...
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE
Allow multiple values with the same key.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY
There must only be one value per key; storing a value should fail if a value under the same key alrea...
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
If a value with the given key exists, replace it.
#define GNUNET_log(kind,...)
struct GNUNET_HashContext * GNUNET_CRYPTO_hash_context_start(void)
Start incremental hashing operation.
Definition: crypto_hash.c:349
void GNUNET_CRYPTO_hash_context_read(struct GNUNET_HashContext *hc, const void *buf, size_t size)
Add data to be hashed.
Definition: crypto_hash.c:365
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:54
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:37
void GNUNET_CRYPTO_hash_context_finish(struct GNUNET_HashContext *hc, struct GNUNET_HashCode *r_hash)
Finish the hash computation.
Definition: crypto_hash.c:389
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
GNUNET_GenericReturnValue
Named constants for return values.
@ GNUNET_OK
@ GNUNET_YES
@ GNUNET_NO
@ GNUNET_SYSERR
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur.
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_DEBUG
@ GNUNET_ERROR_TYPE_INFO
int GNUNET_snprintf(char *buf, size_t size, const char *format,...) __attribute__((format(printf
Like snprintf, just aborts if the buffer is of insufficient size.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_malloc(size)
Wrapper around malloc.
struct GNUNET_MessageHeader * GNUNET_copy_message(const struct GNUNET_MessageHeader *msg)
Create a copy of the given message.
#define GNUNET_free(ptr)
Wrapper around free.
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:304
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *mqm)
Discard the message queue message, free all allocated resources.
Definition: mq.c:285
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct.
Definition: gnunet_mq_lib.h:62
#define GNUNET_MQ_msg_nested_mh(mvar, type, mh)
Allocate a GNUNET_MQ_Envelope, and append a payload message after the given message struct.
#define GNUNET_MQ_msg_header(type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
Definition: gnunet_mq_lib.h:86
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:77
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
#define GNUNET_MQ_msg_header_extra(mh, esize, type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header and extra space.
Definition: gnunet_mq_lib.h:98
#define GNUNET_MQ_extract_nested_mh(var)
Return a pointer to the message at the end of the given message.
#define GNUNET_MESSAGE_TYPE_SETU_REQUEST
Notify the client of an incoming request from a remote peer.
#define GNUNET_MESSAGE_TYPE_SETU_REJECT
Reject a set request.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST
Request a set union operation from a remote peer.
#define GNUNET_MESSAGE_TYPE_SETU_RESULT
Handle result message from operation.
#define GNUNET_MESSAGE_TYPE_SETU_CREATE
Create a new local set.
#define GNUNET_MESSAGE_TYPE_SETU_ADD
Add element to set.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_DONE
Set operation is done.
#define GNUNET_MESSAGE_TYPE_SETU_ACCEPT
Accept an incoming set request.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER
Request all missing elements from the other peer, based on their sets and the elements we previously ...
#define GNUNET_MESSAGE_TYPE_SETU_CANCEL
Cancel a set operation.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER
Tell the other peer which hashes match a given IBF key.
#define GNUNET_MESSAGE_TYPE_SETU_LISTEN
Listen for operation requests.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE
Request all missing elements from the other peer, based on their sets and the elements we previously ...
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEC
Compressed strata estimator.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_IBF
Invertible bloom filter.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS
Actual set elements.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL
Signals other peer that all elements are sent.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT
Send a set element, not as response to a demand but because we're sending the full set.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SE
Strata estimator.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY
Tell the other peer to send us a list of hashes that match an IBF key.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SETU_EVALUATE
Evaluate a set operation.
void GNUNET_SCHEDULER_shutdown(void)
Request the shutdown of a scheduler.
Definition: scheduler.c:562
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run on shutdown, that is when a CTRL-C signal is received,...
Definition: scheduler.c:1334
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:975
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1272
void GNUNET_SERVICE_client_drop(struct GNUNET_SERVICE_Client *c)
Ask the server to disconnect from the given client.
Definition: service.c:2330
void GNUNET_SERVICE_client_continue(struct GNUNET_SERVICE_Client *c)
Continue receiving further messages from the given client.
Definition: service.c:2249
@ GNUNET_SERVICE_OPTION_NONE
Use defaults.
#define GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE
Maximum size of a context message for set operation requests.
void GNUNET_SETU_element_hash(const struct GNUNET_SETU_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition: setu_api.c:890
GNUNET_SETU_Status
Status for the result callback.
@ GNUNET_SETU_STATUS_DONE
Success, all elements have been sent (and received).
@ GNUNET_SETU_STATUS_ADD_REMOTE
Element should be added to the result set of the remote peer, i.e.
@ GNUNET_SETU_STATUS_FAILURE
The other peer refused to do the operation with us, or something went wrong.
@ GNUNET_SETU_STATUS_ADD_LOCAL
Element should be added to the result set of the local peer, i.e.
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
static unsigned int size
Size of the "table".
Definition: peer.c:68
#define _(String)
GNU gettext support macro.
Definition: platform.h:178
const char * name
uint32_t number
void ibf_subtract(struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2)
Subtract ibf2 from ibf1, storing the result in ibf1.
Definition: ibf.c:357
int ibf_decode(struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *ret_id)
Decode and remove an element from the IBF, if possible.
Definition: ibf.c:229
void ibf_read_slice(const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf)
Read buckets from a buffer into an ibf.
Definition: ibf.c:324
void ibf_write_slice(const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf)
Write buckets from an ibf to a buffer.
Definition: ibf.c:291
struct InvertibleBloomFilter * ibf_create(uint32_t size, uint8_t hash_num)
Create an invertible bloom filter.
Definition: ibf.c:80
void ibf_insert(struct InvertibleBloomFilter *ibf, struct IBF_Key key)
Insert a key into an IBF.
Definition: ibf.c:168
struct InvertibleBloomFilter * ibf_dup(const struct InvertibleBloomFilter *ibf)
Create a copy of an IBF, the copy has to be destroyed properly.
Definition: ibf.c:380
void ibf_destroy(struct InvertibleBloomFilter *ibf)
Destroy all resources associated with the invertible bloom filter.
Definition: ibf.c:404
#define IBF_BUCKET_SIZE
Size of one ibf bucket in bytes.
Definition: ibf.h:72
uint8_t ibf_get_max_counter(struct InvertibleBloomFilter *ibf)
Returns the minimal bytes needed to store the counter of the IBF.
Definition: ibf.c:287
State we keep per client.
struct GNUNET_MQ_Handle * mq
MQ to talk to client.
struct Listener * listener
Listener, if associated with the client, otherwise NULL.
struct Set * set
Set, if associated with the client, otherwise NULL.
struct GNUNET_SERVICE_Client * client
Client this is about.
Information about an element element in the set.
int remote
GNUNET_YES if the element is a remote element, and does not belong to the operation's set.
struct GNUNET_SET_Element element
The actual element.
unsigned int generation
First generation that includes this element.
struct GNUNET_HashCode element_hash
Hash of the element.
struct GNUNET_ARM_Operation * next
This is a doubly-linked list.
Definition: arm_api.c:45
Opaque handle to a channel.
Definition: cadet.h:116
Opaque handle to the service.
Definition: cadet_api.c:39
Opaque handle to a port.
Definition: cadet_api.c:80
Internal representation of the hash map.
Internal representation of the hash map.
A 512-bit hashcode.
Handle to a message queue.
Definition: mq.c:87
Message handler for a specific message type.
Header for all communications.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
The identity of the host (wraps the signing key of the peer).
Entry in list of pending tasks.
Definition: scheduler.c:136
Handle to a client that is connected to a service.
Definition: service.c:252
Handle to a service.
Definition: service.c:118
Message sent by a listening client to the service to accept performing the operation with the other p...
Definition: setu.h:79
Sent to the service by the client in order to cancel a set operation.
Definition: setu.h:350
Message sent by the client to the service to ask starting a new set to perform operations with.
Definition: setu.h:41
Message sent by client to the service to add an element to the set.
Definition: setu.h:326
uint16_t element_type
Type of the element to add or remove.
Definition: setu.h:335
uint16_t reserved
For alignment, always zero.
Definition: setu.h:340
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SETU_ADD.
Definition: setu.h:330
Element stored in a set.
uint16_t element_type
Application-specific element type.
uint16_t size
Number of bytes in the buffer pointed to by data.
const void * data
Actual data of the element.
Message sent by client to service to initiate a set operation as a client (not as listener).
Definition: setu.h:203
Message sent by the client to the service to start listening for incoming requests to perform a certa...
Definition: setu.h:56
Message sent by a listening client to the service to reject performing the operation with the other p...
Definition: setu.h:158
A request for an operation with another client.
Definition: setu.h:175
struct GNUNET_PeerIdentity peer_id
Identity of the requesting peer.
Definition: setu.h:190
uint32_t accept_id
ID of the to identify the request when accepting or rejecting it.
Definition: setu.h:185
Message sent by the service to the client to indicate an element that is removed (set intersection) o...
Definition: setu.h:290
uint64_t current_size
Current set size.
Definition: setu.h:299
uint16_t result_status
Was the evaluation successful? Contains an enum GNUNET_SETU_Status in NBO.
Definition: setu.h:310
uint16_t element_type
Type of the element attached to the message, if any.
Definition: setu.h:315
uint32_t request_id
id the result belongs to
Definition: setu.h:304
uint16_t size
Number of bytes in the buffer pointed to by data.
const void * data
Actual data of the element.
uint16_t element_type
Application-specific element type.
Handle for the service.
Context for op_get_element_iterator.
struct GNUNET_HashCode hash
FIXME.
struct KeyEntry * k
FIXME.
Message containing buckets of an invertible bloom filter.
Hash of an IBF key.
Definition: ibf.h:55
Keys that can be inserted into and removed from an IBF.
Definition: ibf.h:46
uint64_t key_val
Definition: ibf.h:47
estimate_best_mode_of_operation (uint64_t avg_element_size, uint64_t local_set_size,...
Invertible bloom filter (IBF).
Definition: ibf.h:83
int remote_decoded_count
If an IBF is decoded this count stores how many elements are on the remote site.
Definition: