GNUnet  0.11.x
gnunet-service-setu.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2013-2017, 2020 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
30 #include "ibf.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_applications.h"
33 #include "gnunet_cadet_service.h"
37 #include <gcrypt.h>
38 #include "gnunet_setu_service.h"
39 #include "setu.h"
40 
41 #define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
42 
47 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
48 
52 #define SE_STRATA_COUNT 32
53 
54 
59 #define SE_IBFS_TOTAL_SIZE 632
60 
64 #define SE_IBF_HASH_NUM 3
65 
69 #define MAX_BUCKETS_PER_MESSAGE ((1 << 16) / IBF_BUCKET_SIZE)
70 
76 #define MAX_IBF_SIZE 1048576
77 
78 
83 #define IBF_MIN_SIZE 37
84 
89 #define DIFFERENTIAL_RTT_MEAN 3.65145
90 
95 #define SECURITY_LEVEL 80
96 
102 #define PROBABILITY_FOR_NEW_ROUND 0.15
103 
108 #define MEASURE_PERFORMANCE 0
109 
110 
115 {
120 
131 
136 
141 
146 
152 
158 
164 
170 
176 };
177 
183 {
188 
193 
198 };
199 
200 
207 struct ElementEntry
208 {
214 
220 
224  unsigned int generation;
225 
230  int remote;
231 };
232 
233 
238 struct Listener;
239 
240 
244 struct Set;
245 
246 
250 struct ClientState
251 {
255  struct Set *set;
256 
260  struct Listener *listener;
261 
266 
270  struct GNUNET_MQ_Handle *mq;
271 };
272 
273 
277 struct Operation
278 {
279 
284  struct GNUNET_PeerIdentity peer;
285 
289  uint64_t initial_size;
290 
294  struct Operation *next;
295 
299  struct Operation *prev;
300 
305 
309  struct Listener *listener;
310 
314  struct GNUNET_MQ_Handle *mq;
315 
320 
325  struct Set *set;
326 
332 
337 
342 
349 
355 
360 
365 
369  int client_done_sent;
370 
375 
379  uint32_t salt_send;
380 
384  uint32_t salt_receive;
385 
390  uint32_t received_fresh;
391 
395  uint32_t received_total;
396 
400  uint32_t salt;
401 
405  uint32_t remote_element_count;
406 
410  uint32_t client_request_id;
411 
416  int force_delta;
417 
422  int force_full;
423 
428  int byzantine;
429 
435 
441 
447  uint32_t suggest_id;
448 
453  unsigned int generation_created;
454 
455 
460 
461 
466 
467 
473 
479  uint8_t peer_site;
480 
481 
486 
491 
496 
501 
502 
507 
512 
517 
521  uint64_t remote_set_diff;
522 
526  uint64_t local_set_diff;
527 
532 };
533 
534 
539 struct SetContent
540 {
545 
550 
555 
559  unsigned int refcount;
560 
564  unsigned int latest_generation;
565 
569  int iterator_count;
570 };
571 
572 
576 struct Set
577 {
581  struct Set *next;
582 
586  struct Set *prev;
587 
592  struct ClientState *cs;
593 
598  struct SetContent *content;
599 
605 
609  struct Operation *ops_head;
610 
614  struct Operation *ops_tail;
615 
620  unsigned int current_generation;
621 
622 };
623 
624 
628 struct KeyEntry
629 {
633  struct IBF_Key ibf_key;
634 
641  struct ElementEntry *element;
642 
648  int received;
649 };
650 
651 
656 struct SendElementClosure
657 {
662  struct IBF_Key ibf_key;
663 
668  struct Operation *op;
669 };
670 
671 
676 struct Listener
677 {
681  struct Listener *next;
682 
686  struct Listener *prev;
687 
693  struct Operation *op_head;
694 
700  struct Operation *op_tail;
701 
706  struct ClientState *cs;
707 
712 
717  struct GNUNET_HashCode app_id;
718 
719 };
720 
721 
726 static struct GNUNET_CADET_Handle *cadet;
727 
732 
736 static struct Listener *listener_head;
737 
741 static struct Listener *listener_tail;
742 
746 static unsigned int num_clients;
747 
752 static int in_shutdown;
753 
759 static uint32_t suggest_id;
760 
761 #if MEASURE_PERFORMANCE
766 static const struct GNUNET_CONFIGURATION_Handle *setu_cfg;
767 
768 
774 struct perf_num_send_received_msg
775 {
776  uint64_t sent;
777  uint64_t sent_var_bytes;
778  uint64_t received;
779  uint64_t received_var_bytes;
780 };
781 
785 struct per_store_struct
786 {
787  struct perf_num_send_received_msg operation_request;
788  struct perf_num_send_received_msg se;
789  struct perf_num_send_received_msg request_full;
790  struct perf_num_send_received_msg element_full;
791  struct perf_num_send_received_msg full_done;
792  struct perf_num_send_received_msg ibf;
793  struct perf_num_send_received_msg inquery;
794  struct perf_num_send_received_msg element;
795  struct perf_num_send_received_msg demand;
796  struct perf_num_send_received_msg offer;
797  struct perf_num_send_received_msg done;
798  struct perf_num_send_received_msg over;
799  uint64_t se_diff;
800  uint64_t se_diff_remote;
801  uint64_t se_diff_local;
802  uint64_t active_passive_switches;
803  uint8_t mode_of_operation;
804 };
805 
806 struct per_store_struct perf_store;
807 #endif
808 
814 {
819 
824 
829 
834 };
835 
841 {
846 
851 
856 };
857 
858 
863 {
876 };
877 
878 
879 #if MEASURE_PERFORMANCE
880 
886 static void
887 load_config (struct Operation *op)
888 {
889  long long number;
890  float fl;
891 
892  setu_cfg = GNUNET_CONFIGURATION_create ();
893  GNUNET_CONFIGURATION_load (setu_cfg,
894  "perf_setu.conf");
896  "IBF",
897  "BUCKET_NUMBER_FACTOR",
898  &fl);
899  op->ibf_bucket_number_factor = fl;
901  "IBF",
902  "NUMBER_PER_BUCKET",
903  &number);
904  op->ibf_number_buckets_per_element = number;
906  "PERFORMANCE",
907  "TRADEOFF",
908  &number);
909  op->rtt_bandwidth_tradeoff = number;
911  "BOUNDARIES",
912  "UPPER_ELEMENT",
913  &number);
914  op->byzantine_upper_bound = number;
915  op->peer_site = 0;
916 }
917 
918 
925 static int
926 sum_sent_received_bytes (uint64_t size,
927  struct perf_num_send_received_msg
928  perf_num_send_received_msg)
929 {
930  return (size * perf_num_send_received_msg.sent)
931  + (size * perf_num_send_received_msg.received)
932  + perf_num_send_received_msg.sent_var_bytes
933  + perf_num_send_received_msg.received_var_bytes;
934 }
935 
936 
940 static void
941 calculate_perf_store ()
942 {
943 
947  float rtt = 1;
948  int bytes_transmitted = 0;
949 
953  if ((perf_store.element_full.received != 0) ||
954  (perf_store.element_full.sent != 0)
955  )
956  rtt += 1;
957 
958  if ((perf_store.request_full.received != 0) ||
959  (perf_store.request_full.sent != 0)
960  )
961  rtt += 0.5;
962 
967  if ((perf_store.element.received != 0) ||
968  (perf_store.element.sent != 0))
969  {
970  int iterations = perf_store.active_passive_switches;
971 
972  if (iterations > 0)
973  rtt += iterations * 0.5;
974  rtt += 2.5;
975  }
976 
977 
981  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
983  perf_store.request_full);
984 
985  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
987  perf_store.element_full);
988  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
990  perf_store.element);
991  // bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_store.operation_request);
992  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
994  perf_store.se);
995  bytes_transmitted += sum_sent_received_bytes (4, perf_store.full_done);
996  bytes_transmitted += sum_sent_received_bytes (sizeof(struct IBFMessage),
997  perf_store.ibf);
998  bytes_transmitted += sum_sent_received_bytes (sizeof(struct InquiryMessage),
999  perf_store.inquery);
1000  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1002  perf_store.demand);
1003  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1005  perf_store.offer);
1006  bytes_transmitted += sum_sent_received_bytes (4, perf_store.done);
1007 
1011  float factor;
1012  GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR",
1013  &factor);
1014  long long num_per_bucket;
1015  GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET",
1016  &num_per_bucket);
1017 
1018 
1019  int decoded = 0;
1020  if (perf_store.active_passive_switches == 0)
1021  decoded = 1;
1022  int ibf_bytes_transmitted = sum_sent_received_bytes (sizeof(struct
1023  IBFMessage),
1024  perf_store.ibf);
1025 
1026  FILE *out1 = fopen ("perf_data.csv", "a");
1027  fprintf (out1, "%d,%f,%d,%d,%f,%d,%d,%d,%d,%d\n",num_per_bucket,factor,
1028  decoded,ibf_bytes_transmitted,rtt,perf_store.se_diff,
1029  bytes_transmitted,
1030  perf_store.se_diff_local,perf_store.se_diff_remote,
1031  perf_store.mode_of_operation);
1032  fclose (out1);
1033 
1034 }
1035 
1036 
1037 #endif
1050 static uint8_t
1051 estimate_best_mode_of_operation (uint64_t avg_element_size,
1052  uint64_t local_set_size,
1053  uint64_t remote_set_size,
1054  uint64_t est_set_diff_remote,
1055  uint64_t est_set_diff_local,
1056  uint64_t bandwith_latency_tradeoff,
1057  uint64_t ibf_bucket_number_factor)
1058 {
1059 
1060  /*
1061  * In case of initial sync fall to predefined states
1062  */
1063 
1064  if (0 == local_set_size)
1066  if (0 == remote_set_size)
1068 
1069  /*
1070  * Calculate bytes for full Sync
1071  */
1072 
1073  uint8_t sizeof_full_done_header = 4;
1074  uint8_t sizeof_done_header = 4;
1075  uint8_t rtt_min_full = 2;
1076  uint8_t sizeof_request_full = 4;
1077  uint64_t estimated_total_diff = (est_set_diff_remote + est_set_diff_local);
1078 
1079  /* Estimate byte required if we send first */
1080  uint64_t total_elements_to_send_local_send_first = est_set_diff_remote
1081  + local_set_size;
1082 
1083  uint64_t total_bytes_full_local_send_first = (avg_element_size
1084  *
1085  total_elements_to_send_local_send_first) \
1086  + (
1087  total_elements_to_send_local_send_first * sizeof(struct
1089  + (sizeof_full_done_header * 2) \
1090  + rtt_min_full
1091  * bandwith_latency_tradeoff;
1092 
1093  /* Estimate bytes required if we request from remote peer */
1094  uint64_t total_elements_to_send_remote_send_first = est_set_diff_local
1095  + remote_set_size;
1096 
1097  uint64_t total_bytes_full_remote_send_first = (avg_element_size
1098  *
1099  total_elements_to_send_remote_send_first) \
1100  + (
1101  total_elements_to_send_remote_send_first * sizeof(struct
1103  + (sizeof_full_done_header * 2) \
1104  + (rtt_min_full + 0.5)
1105  * bandwith_latency_tradeoff \
1106  + sizeof_request_full;
1107 
1108  /*
1109  * Calculate bytes for differential Sync
1110  */
1111 
1112  /* Estimate bytes required by IBF transmission*/
1113 
1114  long double ibf_bucket_count = estimated_total_diff
1115  * ibf_bucket_number_factor;
1116 
1117  if (ibf_bucket_count <= IBF_MIN_SIZE)
1118  {
1119  ibf_bucket_count = IBF_MIN_SIZE;
1120  }
1121  uint64_t ibf_message_count = ceil ( ((float) ibf_bucket_count)
1122  / ((float) MAX_BUCKETS_PER_MESSAGE));
1123 
1124  uint64_t estimated_counter_size = ceil (
1125  MIN (2 * log2l (((float) local_set_size)
1126  / ((float) ibf_bucket_count)),
1127  log2l (local_set_size)));
1128 
1129  long double counter_bytes = (float) estimated_counter_size / 8;
1130 
1131  uint64_t ibf_bytes = ceil ((sizeof (struct IBFMessage) * ibf_message_count)
1132  * 1.2 \
1133  + (ibf_bucket_count * sizeof(struct IBF_Key)) * 1.2 \
1134  + (ibf_bucket_count * sizeof(struct IBF_KeyHash))
1135  * 1.2 \
1136  + (ibf_bucket_count * counter_bytes) * 1.2);
1137 
1138  /* Estimate full byte count for differential sync */
1139  uint64_t element_size = (avg_element_size
1140  + sizeof (struct GNUNET_SETU_ElementMessage)) \
1141  * estimated_total_diff;
1142  uint64_t done_size = sizeof_done_header;
1143  uint64_t inquery_size = (sizeof (struct IBF_Key)
1144  + sizeof (struct InquiryMessage))
1145  * estimated_total_diff;
1146  uint64_t demand_size =
1147  (sizeof(struct GNUNET_HashCode) + sizeof(struct GNUNET_MessageHeader))
1148  * estimated_total_diff;
1149  uint64_t offer_size = (sizeof (struct GNUNET_HashCode)
1150  + sizeof (struct GNUNET_MessageHeader))
1151  * estimated_total_diff;
1152 
1153  uint64_t total_bytes_diff = (element_size + done_size + inquery_size
1154  + demand_size + offer_size + ibf_bytes) \
1156  * bandwith_latency_tradeoff);
1157 
1158  uint64_t full_min = MIN (total_bytes_full_local_send_first,
1159  total_bytes_full_remote_send_first);
1160 
1161  /* Decide between full and differential sync */
1162 
1163  if (full_min < total_bytes_diff)
1164  {
1165  /* Decide between sending all element first or receiving all elements */
1166  if (total_bytes_full_remote_send_first > total_bytes_full_local_send_first)
1167  {
1169  }
1170  else
1171  {
1173  }
1174  }
1175  else
1176  {
1177  return DIFFERENTIAL_SYNC;
1178  }
1179 }
1180 
1181 
1190 static enum GNUNET_GenericReturnValue
1191 check_valid_phase (const uint8_t allowed_phases[],
1192  size_t size_phases,
1193  struct Operation *op)
1194 {
1198  for (uint32_t phase_ctr = 0; phase_ctr < size_phases; phase_ctr++)
1199  {
1200  uint8_t phase = allowed_phases[phase_ctr];
1201  if (phase == op->phase)
1202  {
1204  "Message received in valid phase\n");
1205  return GNUNET_YES;
1206  }
1207  }
1209  "Received message in invalid phase: %u\n", op->phase);
1210  return GNUNET_NO;
1211 }
1212 
1213 
1225 static int
1227  enum MESSAGE_CONTROL_FLOW_STATE new_mcfs,
1228  const struct GNUNET_HashCode *hash_code,
1229  enum MESSAGE_TYPE mt)
1230 {
1231  struct messageControlFlowElement *cfe = NULL;
1232  enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1233 
1238  cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1239  if ((ELEMENT_MESSAGE == mt) && (cfe != NULL))
1240  {
1241  if ((new_mcfs != MSG_CFS_SENT) && (MSG_CFS_RECEIVED != cfe->offer))
1242  {
1244  "Received an element without sent offer!\n");
1245  return GNUNET_NO;
1246  }
1247  /* Check that only requested elements are received! */
1248  if ((ELEMENT_MESSAGE == mt) && (new_mcfs != MSG_CFS_SENT) && (cfe->demand !=
1249  MSG_CFS_SENT))
1250  {
1252  "Received an element that was not demanded\n");
1253  return GNUNET_NO;
1254  }
1255  }
1256 
1261  if (NULL == cfe)
1262  {
1263  cfe = GNUNET_new (struct messageControlFlowElement);
1264  if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (hash_map, hash_code,
1265  cfe,
1267  {
1268  GNUNET_free (cfe);
1269  return GNUNET_SYSERR;
1270  }
1271  }
1272 
1277  if (OFFER_MESSAGE == mt)
1278  {
1279  mcfs = &cfe->offer;
1280  }
1281  else if (DEMAND_MESSAGE == mt)
1282  {
1283  mcfs = &cfe->demand;
1284  }
1285  else if (ELEMENT_MESSAGE == mt)
1286  {
1287  mcfs = &cfe->element;
1288  }
1289  else
1290  {
1291  return GNUNET_SYSERR;
1292  }
1293 
1298  if (new_mcfs <= *mcfs)
1299  {
1300  return GNUNET_NO;
1301  }
1302 
1303  *mcfs = new_mcfs;
1304  return GNUNET_YES;
1305 }
1306 
1307 
1315 static int
1318  struct GNUNET_HashCode *hash_code,
1319  enum MESSAGE_TYPE mt)
1320 {
1321  struct messageControlFlowElement *cfe = NULL;
1322  enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1323 
1324  cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1325 
1330  if (cfe != NULL)
1331  {
1332  if (OFFER_MESSAGE == mt)
1333  {
1334  mcfs = &cfe->offer;
1335  }
1336  else if (DEMAND_MESSAGE == mt)
1337  {
1338  mcfs = &cfe->demand;
1339  }
1340  else if (ELEMENT_MESSAGE == mt)
1341  {
1342  mcfs = &cfe->element;
1343  }
1344  else
1345  {
1346  return GNUNET_SYSERR;
1347  }
1348 
1352  if (*mcfs != MSG_CFS_UNINITIALIZED)
1353  {
1354  return GNUNET_YES;
1355  }
1356  }
1357  return GNUNET_NO;
1358 }
1359 
1360 
1371 static int
1373  const struct GNUNET_HashCode *key,
1374  void *value)
1375 {
1376  struct messageControlFlowElement *mcfe = value;
1377 
1378  if (((mcfe->element == MSG_CFS_SENT) || (mcfe->element == MSG_CFS_RECEIVED) ))
1379  {
1380  return GNUNET_YES;
1381  }
1382  return GNUNET_NO;
1383 }
1384 
1385 
1395 static int
1397  const struct GNUNET_HashCode *key,
1398  void *value)
1399 {
1400  struct Operation *op = cls;
1401  struct GNUNET_SETU_Element *element = value;
1402  op->total_elements_size_local += element->size;
1403  return GNUNET_YES;
1404 }
1405 
1406 
1416 static int
1418  const struct GNUNET_HashCode *key,
1419  void *value)
1420 {
1421  struct Operation *op = cls;
1422 
1423  struct GNUNET_HashContext *hashed_key_context =
1425  struct GNUNET_HashCode new_key;
1426 
1430  GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1431  &key,
1432  sizeof(struct IBF_Key));
1433  GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1434  &op->set->content->elements_randomized_salt,
1435  sizeof(uint32_t));
1436  GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
1437  &new_key);
1438  GNUNET_CONTAINER_multihashmap_put (op->set->content->elements_randomized,
1439  &new_key,value,
1441  return GNUNET_YES;
1442 }
1443 
1444 
1455 static int
1457  uint32_t key,
1458  void *value)
1459 {
1460  struct KeyEntry *k = value;
1461 
1462  GNUNET_assert (NULL != k);
1463  if (GNUNET_YES == k->element->remote)
1464  {
1465  GNUNET_free (k->element);
1466  k->element = NULL;
1467  }
1468  GNUNET_free (k);
1469  return GNUNET_YES;
1470 }
1471 
1472 
1479 static void
1480 send_client_done (void *cls)
1481 {
1482  struct Operation *op = cls;
1483  struct GNUNET_MQ_Envelope *ev;
1484  struct GNUNET_SETU_ResultMessage *rm;
1485 
1486  if (GNUNET_YES == op->client_done_sent)
1487  return;
1488  if (PHASE_FINISHED != op->phase)
1489  {
1491  "Union operation failed\n");
1493  "# Union operations failed",
1494  1,
1495  GNUNET_NO);
1498  rm->request_id = htonl (op->client_request_id);
1499  rm->element_type = htons (0);
1500  GNUNET_MQ_send (op->set->cs->mq,
1501  ev);
1502  return;
1503  }
1504 
1505  op->client_done_sent = GNUNET_YES;
1506 
1508  "# Union operations succeeded",
1509  1,
1510  GNUNET_NO);
1512  "Signalling client that union operation is done\n");
1513  ev = GNUNET_MQ_msg (rm,
1515  rm->request_id = htonl (op->client_request_id);
1516  rm->result_status = htons (GNUNET_SETU_STATUS_DONE);
1517  rm->element_type = htons (0);
1519  op->key_to_element));
1520  GNUNET_MQ_send (op->set->cs->mq,
1521  ev);
1522 }
1523 
1524 
1531 static int
1533 {
1534  if (op->byzantine != GNUNET_YES)
1535  return GNUNET_OK;
1536 
1540  if (op->remote_element_count + op->remote_set_diff >
1541  op->byzantine_upper_bound)
1542  return GNUNET_SYSERR;
1543  if (op->local_element_count + op->local_set_diff > op->byzantine_upper_bound)
1544  return GNUNET_SYSERR;
1545 
1549  if (op->remote_element_count < op->byzantine_lower_bound)
1550  return GNUNET_SYSERR;
1551  return GNUNET_OK;
1552 }
1553 
1554 
1555 /* FIXME: the destroy logic is a mess and should be cleaned up! */
1556 
1569 static void
1571 {
1572  struct Set *set = op->set;
1573  struct GNUNET_CADET_Channel *channel;
1574 
1576  "Destroying union operation %p\n",
1577  op);
1578  GNUNET_assert (NULL == op->listener);
1579  /* check if the op was canceled twice */
1580  if (NULL != op->remote_ibf)
1581  {
1582  ibf_destroy (op->remote_ibf);
1583  op->remote_ibf = NULL;
1584  }
1585  if (NULL != op->demanded_hashes)
1586  {
1587  GNUNET_CONTAINER_multihashmap_destroy (op->demanded_hashes);
1588  op->demanded_hashes = NULL;
1589  }
1590  if (NULL != op->local_ibf)
1591  {
1592  ibf_destroy (op->local_ibf);
1593  op->local_ibf = NULL;
1594  }
1595  if (NULL != op->se)
1596  {
1598  op->se = NULL;
1599  }
1600  if (NULL != op->key_to_element)
1601  {
1604  NULL);
1605  GNUNET_CONTAINER_multihashmap32_destroy (op->key_to_element);
1606  op->key_to_element = NULL;
1607  }
1608  if (NULL != set)
1609  {
1611  set->ops_tail,
1612  op);
1613  op->set = NULL;
1614  }
1615  if (NULL != op->context_msg)
1616  {
1617  GNUNET_free (op->context_msg);
1618  op->context_msg = NULL;
1619  }
1620  if (NULL != (channel = op->channel))
1621  {
1622  /* This will free op; called conditionally as this helper function
1623  is also called from within the channel disconnect handler. */
1624  op->channel = NULL;
1625  GNUNET_CADET_channel_destroy (channel);
1626  }
1627  /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
1628  * there was a channel end handler that will free 'op' on the call stack. */
1629 }
1630 
1631 
1637 static void
1639 
1640 
1646 static void
1648 {
1649  struct Listener *listener;
1650 
1652  "Destroying incoming operation %p\n",
1653  op);
1654  if (NULL != (listener = op->listener))
1655  {
1657  listener->op_tail,
1658  op);
1659  op->listener = NULL;
1660  }
1661  if (NULL != op->timeout_task)
1662  {
1663  GNUNET_SCHEDULER_cancel (op->timeout_task);
1664  op->timeout_task = NULL;
1665  }
1667 }
1668 
1669 
1675 static void
1677 {
1678  struct GNUNET_CADET_Channel *channel;
1679 
1680  if (NULL != (channel = op->channel))
1681  {
1682  /* This will free op; called conditionally as this helper function
1683  is also called from within the channel disconnect handler. */
1684  op->channel = NULL;
1685  GNUNET_CADET_channel_destroy (channel);
1686  }
1687  if (NULL != op->listener)
1688  {
1689  incoming_destroy (op);
1690  return;
1691  }
1692  if (NULL != op->set)
1693  send_client_done (op);
1695  GNUNET_free (op);
1696 }
1697 
1698 
1705 static void
1707 {
1708  struct GNUNET_MQ_Envelope *ev;
1710 
1712  "union operation failed\n");
1714  msg->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
1715  msg->request_id = htonl (op->client_request_id);
1716  msg->element_type = htons (0);
1717  GNUNET_MQ_send (op->set->cs->mq,
1718  ev);
1720 }
1721 
1722 
1733 static void
1735 {
1736  if (GNUNET_YES != op->byzantine)
1737  return;
1738 
1739  int security_level_lb = -1 * SECURITY_LEVEL;
1740  uint64_t duplicates = op->received_fresh - op->received_total;
1741 
1742  /*
1743  * Protect full sync from receiving double element when in FULL SENDING
1744  */
1745  if (PHASE_FULL_SENDING == op->phase)
1746  {
1747  if (duplicates > 0)
1748  {
1750  "PROTOCOL VIOLATION: Received duplicate element in full receiving "
1751  "mode of operation this is not allowed! Duplicates: %llu\n",
1752  (unsigned long long) duplicates);
1753  GNUNET_break_op (0);
1755  return;
1756  }
1757 
1758  }
1759 
1760  /*
1761  * Protect full sync with probabilistic algorithm
1762  */
1763  if (PHASE_FULL_RECEIVING == op->phase)
1764  {
1765  if (0 == op->remote_set_diff)
1766  op->remote_set_diff = 1;
1767 
1768  long double base = (1 - (long double) (op->remote_set_diff
1769  / (long double) (op->initial_size
1770  + op->
1771  remote_set_diff)));
1772  long double exponent = (op->received_total - (op->received_fresh * ((long
1773  double)
1774  op->
1775  initial_size
1776  / (long
1777  double)
1778  op->
1779  remote_set_diff)));
1780  long double value = exponent * (log2l (base) / log2l (2));
1781  if ((value < security_level_lb) || (value > SECURITY_LEVEL) )
1782  {
1784  "PROTOCOL VIOLATION: Other peer violated probabilistic rule for receiving "
1785  "to many duplicated full element : %LF\n",
1786  value);
1787  GNUNET_break_op (0);
1789  return;
1790  }
1791  }
1792 }
1793 
1794 
1799 static void
1801 {
1802  double probability = op->differential_sync_iterations * (log2l (
1804  / log2l (2));
1805  if ((-1 * SECURITY_LEVEL) > probability)
1806  {
1808  "PROTOCOL VIOLATION: Other peer violated probabilistic rule for to many active passive "
1809  "switches in differential sync: %u\n",
1810  op->differential_sync_iterations);
1811  GNUNET_break_op (0);
1813  return;
1814  }
1815 }
1816 
1817 
1825 static struct IBF_Key
1826 get_ibf_key (const struct GNUNET_HashCode *src)
1827 {
1828  struct IBF_Key key;
1829  uint16_t salt = 0;
1830 
1832  GNUNET_CRYPTO_kdf (&key, sizeof(key),
1833  src, sizeof *src,
1834  &salt, sizeof(salt),
1835  NULL, 0));
1836  return key;
1837 }
1838 
1839 
1843 struct GetElementContext
1844 {
1848  struct GNUNET_HashCode hash;
1849 
1853  struct KeyEntry *k;
1854 };
1855 
1856 
1867 static int
1869  uint32_t key,
1870  void *value)
1871 {
1872  struct GetElementContext *ctx = cls;
1873  struct KeyEntry *k = value;
1874 
1875  GNUNET_assert (NULL != k);
1877  &ctx->hash))
1878  {
1879  ctx->k = k;
1880  return GNUNET_NO;
1881  }
1882  return GNUNET_YES;
1883 }
1884 
1885 
1894 static struct KeyEntry *
1896  const struct GNUNET_HashCode *element_hash)
1897 {
1898  int ret;
1899  struct IBF_Key ibf_key;
1900  struct GetElementContext ctx = { { { 0 } }, 0 };
1901 
1902  ctx.hash = *element_hash;
1903 
1904  ibf_key = get_ibf_key (element_hash);
1906  (uint32_t) ibf_key.key_val,
1908  &ctx);
1909 
1910  /* was the iteration aborted because we found the element? */
1911  if (GNUNET_SYSERR == ret)
1912  {
1913  GNUNET_assert (NULL != ctx.k);
1914  return ctx.k;
1915  }
1916  return NULL;
1917 }
1918 
1919 
1934 static void
1936  struct ElementEntry *ee,
1937  int received)
1938 {
1939  struct IBF_Key ibf_key;
1940  struct KeyEntry *k;
1941 
1942  ibf_key = get_ibf_key (&ee->element_hash);
1943  k = GNUNET_new (struct KeyEntry);
1944  k->element = ee;
1945  k->ibf_key = ibf_key;
1946  k->received = received;
1948  GNUNET_CONTAINER_multihashmap32_put (op->key_to_element,
1949  (uint32_t) ibf_key.key_val,
1950  k,
1952 }
1953 
1954 
1959 static void
1960 salt_key (const struct IBF_Key *k_in,
1961  uint32_t salt,
1962  struct IBF_Key *k_out)
1963 {
1964  int s = (salt * 7) % 64;
1965  uint64_t x = k_in->key_val;
1966 
1967  /* rotate ibf key */
1968  x = (x >> s) | (x << (64 - s));
1969  k_out->key_val = x;
1970 }
1971 
1972 
1976 static void
1977 unsalt_key (const struct IBF_Key *k_in,
1978  uint32_t salt,
1979  struct IBF_Key *k_out)
1980 {
1981  int s = (salt * 7) % 64;
1982  uint64_t x = k_in->key_val;
1983 
1984  x = (x << s) | (x >> (64 - s));
1985  k_out->key_val = x;
1986 }
1987 
1988 
1996 static int
1998  uint32_t key,
1999  void *value)
2000 {
2001  struct Operation *op = cls;
2002  struct KeyEntry *ke = value;
2003  struct IBF_Key salted_key;
2004 
2006  "[OP %p] inserting %lx (hash %s) into ibf\n",
2007  op,
2008  (unsigned long) ke->ibf_key.key_val,
2009  GNUNET_h2s (&ke->element->element_hash));
2010  salt_key (&ke->ibf_key,
2011  op->salt_send,
2012  &salted_key);
2013  ibf_insert (op->local_ibf, salted_key);
2014  return GNUNET_YES;
2015 }
2016 
2017 
2025 static int
2027  struct Operation *op)
2028 {
2029  return ee->generation >= op->generation_created;
2030 }
2031 
2032 
2043 static int
2045  const struct GNUNET_HashCode *key,
2046  void *value)
2047 {
2048  struct Operation *op = cls;
2049  struct ElementEntry *ee = value;
2050 
2051  /* make sure that the element belongs to the set at the time
2052  * of creating the operation */
2053  if (GNUNET_NO ==
2055  op))
2056  return GNUNET_YES;
2057  GNUNET_assert (GNUNET_NO == ee->remote);
2059  ee,
2060  GNUNET_NO);
2061  return GNUNET_YES;
2062 }
2063 
2064 
2070 static void
2072 {
2073  unsigned int len;
2074 
2075  GNUNET_assert (NULL == op->key_to_element);
2076  len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
2077  op->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
2078  GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2080  op);
2081 }
2082 
2083 
2092 static int
2094  uint32_t size)
2095 {
2096  GNUNET_assert (NULL != op->key_to_element);
2097 
2098  if (NULL != op->local_ibf)
2099  ibf_destroy (op->local_ibf);
2100  // op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
2101  op->local_ibf = ibf_create (size,
2102  ((uint8_t) op->ibf_number_buckets_per_element));
2103  if (NULL == op->local_ibf)
2104  {
2106  "Failed to allocate local IBF\n");
2107  return GNUNET_SYSERR;
2108  }
2111  op);
2112  return GNUNET_OK;
2113 }
2114 
2115 
2125 static int
2127  uint32_t ibf_size)
2128 {
2129  uint64_t buckets_sent = 0;
2130  struct InvertibleBloomFilter *ibf;
2131  op->differential_sync_iterations++;
2132 
2136  uint32_t ibf_min_size = IBF_MIN_SIZE;
2137 
2138  if (ibf_size < ibf_min_size)
2139  {
2140  ibf_size = ibf_min_size;
2141  }
2142  if (GNUNET_OK !=
2143  prepare_ibf (op, ibf_size))
2144  {
2145  /* allocation failed */
2146  return GNUNET_SYSERR;
2147  }
2148 
2150  "sending ibf of size %u\n",
2151  (unsigned int) ibf_size);
2152 
2153  {
2154  char name[64];
2155 
2157  sizeof(name),
2158  "# sent IBF (order %u)",
2159  ibf_size);
2161  }
2162 
2163  ibf = op->local_ibf;
2164 
2165  while (buckets_sent < ibf_size)
2166  {
2167  unsigned int buckets_in_message;
2168  struct GNUNET_MQ_Envelope *ev;
2169  struct IBFMessage *msg;
2170 
2171  buckets_in_message = ibf_size - buckets_sent;
2172  /* limit to maximum */
2173  if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
2174  buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
2175 
2176 #if MEASURE_PERFORMANCE
2177  perf_store.ibf.sent += 1;
2178  perf_store.ibf.sent_var_bytes += (buckets_in_message * IBF_BUCKET_SIZE);
2179 #endif
2180  ev = GNUNET_MQ_msg_extra (msg,
2181  buckets_in_message * IBF_BUCKET_SIZE,
2183  msg->ibf_size = ibf_size;
2184  msg->offset = htonl (buckets_sent);
2185  msg->salt = htonl (op->salt_send);
2186  msg->ibf_counter_bit_length = ibf_get_max_counter (ibf);
2187 
2188 
2189  ibf_write_slice (ibf, buckets_sent,
2190  buckets_in_message, &msg[1], msg->ibf_counter_bit_length);
2191  buckets_sent += buckets_in_message;
2193  "ibf chunk size %u, %llu/%u sent\n",
2194  (unsigned int) buckets_in_message,
2195  (unsigned long long) buckets_sent,
2196  (unsigned int) ibf_size);
2197  GNUNET_MQ_send (op->mq, ev);
2198  }
2199 
2200  /* The other peer must decode the IBF, so
2201  * we're passive. */
2202  op->phase = PHASE_PASSIVE_DECODING;
2203  return GNUNET_OK;
2204 }
2205 
2206 
2214 static unsigned int
2215 get_size_from_difference (unsigned int diff, int number_buckets_per_element,
2216  float ibf_bucket_number_factor)
2217 {
2220  return (((int) (diff * ibf_bucket_number_factor)) | 1);
2221 
2222 }
2223 
2224 
2225 static unsigned int
2226 get_next_ibf_size (float ibf_bucket_number_factor, unsigned int
2227  decoded_elements, unsigned int last_ibf_size)
2228 {
2229  unsigned int next_size = (unsigned int) ((last_ibf_size * 2)
2230  - (ibf_bucket_number_factor
2231  * decoded_elements));
2234  return next_size | 1;
2235 }
2236 
2237 
2247 static int
2249  const struct GNUNET_HashCode *key,
2250  void *value)
2251 {
2252  struct Operation *op = cls;
2253  struct GNUNET_SETU_ElementMessage *emsg;
2254  struct ElementEntry *ee = value;
2255  struct GNUNET_SETU_Element *el = &ee->element;
2256  struct GNUNET_MQ_Envelope *ev;
2257 
2259  "Sending element %s\n",
2260  GNUNET_h2s (key));
2261 #if MEASURE_PERFORMANCE
2262  perf_store.element_full.received += 1;
2263  perf_store.element_full.received_var_bytes += el->size;
2264 #endif
2265  ev = GNUNET_MQ_msg_extra (emsg,
2266  el->size,
2268  emsg->element_type = htons (el->element_type);
2269  GNUNET_memcpy (&emsg[1],
2270  el->data,
2271  el->size);
2272  GNUNET_MQ_send (op->mq,
2273  ev);
2274  return GNUNET_YES;
2275 }
2276 
2277 
2283 static void
2285 {
2286  struct GNUNET_MQ_Envelope *ev;
2287 
2288  op->phase = PHASE_FULL_SENDING;
2290  "Dedicing to transmit the full set\n");
2291  /* FIXME: use a more memory-friendly way of doing this with an
2292  iterator, just as we do in the non-full case! */
2293 
2294  // Randomize Elements to send
2295  op->set->content->elements_randomized = GNUNET_CONTAINER_multihashmap_create (
2296  32,GNUNET_NO);
2297  op->set->content->elements_randomized_salt = GNUNET_CRYPTO_random_u64 (
2299  UINT64_MAX);
2300  (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2301  &
2303  op);
2304 
2306  op->set->content->elements_randomized,
2308  op);
2309 #if MEASURE_PERFORMANCE
2310  perf_store.full_done.sent += 1;
2311 #endif
2313  GNUNET_MQ_send (op->mq,
2314  ev);
2315 }
2316 
2317 
2324 static int
2326  const struct StrataEstimatorMessage *msg)
2327 {
2328  struct Operation *op = cls;
2329  int is_compressed;
2330  size_t len;
2331 
2332  if (op->phase != PHASE_EXPECT_SE)
2333  {
2334  GNUNET_break (0);
2335  return GNUNET_SYSERR;
2336  }
2337  is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
2338  msg->header.type));
2339  len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
2340  if ((GNUNET_NO == is_compressed) &&
2342  {
2343  GNUNET_break (0);
2344  return GNUNET_SYSERR;
2345  }
2346  return GNUNET_OK;
2347 }
2348 
2349 
2356 static void
2358  const struct StrataEstimatorMessage *msg)
2359 {
2360 #if MEASURE_PERFORMANCE
2361  perf_store.se.received += 1;
2362  perf_store.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct
2364 #endif
2365  struct Operation *op = cls;
2366  struct MultiStrataEstimator *remote_se;
2367  unsigned int diff;
2368  uint64_t other_size;
2369  size_t len;
2370  int is_compressed;
2371  op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
2372  op->set->content->elements);
2373  // Setting peer site to receiving peer
2374  op->peer_site = 1;
2375 
2379  uint8_t allowed_phases[] = {PHASE_EXPECT_SE};
2380  if (GNUNET_OK !=
2381  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2382  {
2383  GNUNET_break (0);
2385  return;
2386  }
2387 
2389  if ((msg->se_count > 8) || (__builtin_popcount ((int) msg->se_count) != 1))
2390  {
2392  "PROTOCOL VIOLATION: Invalid number of se transmitted by other peer %u\n",
2393  msg->se_count);
2394  GNUNET_break_op (0);
2396  return;
2397  }
2398 
2399  is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
2400  msg->header.type));
2402  "# bytes of SE received",
2403  ntohs (msg->header.size),
2404  GNUNET_NO);
2405  len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
2406  other_size = GNUNET_ntohll (msg->set_size);
2407  op->remote_element_count = other_size;
2408 
2409  if (op->byzantine_upper_bound < op->remote_element_count)
2410  {
2412  "Exceeded configured upper bound <%lu> of element: %u\n",
2413  op->byzantine_upper_bound,
2414  op->remote_element_count);
2416  return;
2417  }
2418 
2421  SE_IBF_HASH_NUM);
2422  if (NULL == remote_se)
2423  {
2424  /* insufficient resources, fail */
2426  return;
2427  }
2428  if (GNUNET_OK !=
2430  len,
2431  is_compressed,
2432  msg->se_count,
2434  remote_se))
2435  {
2436  /* decompression failed */
2437  strata_estimator_destroy (remote_se);
2439  return;
2440  }
2441  GNUNET_assert (NULL != op->se);
2442  strata_estimator_difference (remote_se,
2443  op->se);
2444 
2445  /* Calculate remote local diff */
2446  long diff_remote = remote_se->stratas[0]->strata[0]->remote_decoded_count;
2447  long diff_local = remote_se->stratas[0]->strata[0]->local_decoded_count;
2448 
2449  /* Prevent estimations from overshooting max element */
2450  if (diff_remote + op->remote_element_count > op->byzantine_upper_bound)
2451  diff_remote = op->byzantine_upper_bound - op->remote_element_count;
2452  if (diff_local + op->local_element_count > op->byzantine_upper_bound)
2453  diff_local = op->byzantine_upper_bound - op->local_element_count;
2454  if ((diff_remote < 0) || (diff_local < 0))
2455  {
2456  strata_estimator_destroy (remote_se);
2458  "PROTOCOL VIOLATION: More element is set as upper boundary or other peer is "
2459  "malicious: remote diff %ld, local diff: %ld\n",
2460  diff_remote, diff_local);
2461  GNUNET_break_op (0);
2463  return;
2464  }
2465 
2466  /* Make estimation more precise in initial sync cases */
2467  if (0 == op->remote_element_count)
2468  {
2469  diff_remote = 0;
2470  diff_local = op->local_element_count;
2471  }
2472  if (0 == op->local_element_count)
2473  {
2474  diff_local = 0;
2475  diff_remote = op->remote_element_count;
2476  }
2477 
2478  diff = diff_remote + diff_local;
2479  op->remote_set_diff = diff_remote;
2480 
2482  uint64_t avg_element_size = 0;
2483  if (0 < op->local_element_count)
2484  {
2485  op->total_elements_size_local = 0;
2486  GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2487  &
2489  op);
2490  avg_element_size = op->total_elements_size_local / op->local_element_count;
2491  }
2492 
2493  op->mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2495  op->set->content->
2496  elements),
2497  op->
2498  remote_element_count,
2499  diff_remote,
2500  diff_local,
2501  op->
2502  rtt_bandwidth_tradeoff,
2503  op->
2504  ibf_bucket_number_factor);
2505 
2506 #if MEASURE_PERFORMANCE
2507  perf_store.se_diff_local = diff_local;
2508  perf_store.se_diff_remote = diff_remote;
2509  perf_store.se_diff = diff;
2510  perf_store.mode_of_operation = op->mode_of_operation;
2511 #endif
2512 
2513  strata_estimator_destroy (remote_se);
2515  op->se = NULL;
2517  "got se diff=%d, using ibf size %d\n",
2518  diff,
2519  1U << get_size_from_difference (diff, op->ibf_number_buckets_per_element,
2520  op->ibf_bucket_number_factor));
2521 
2522  {
2523  char *set_debug;
2524 
2525  set_debug = getenv ("GNUNET_SETU_BENCHMARK");
2526  if ((NULL != set_debug) &&
2527  (0 == strcmp (set_debug, "1")))
2528  {
2529  FILE *f = fopen ("set.log", "a");
2530  fprintf (f, "%llu\n", (unsigned long long) diff);
2531  fclose (f);
2532  }
2533  }
2534 
2535  if ((GNUNET_YES == op->byzantine) &&
2536  (other_size < op->byzantine_lower_bound))
2537  {
2538  GNUNET_break (0);
2540  return;
2541  }
2542 
2543  if ((GNUNET_YES == op->force_full) ||
2544  (op->mode_of_operation != DIFFERENTIAL_SYNC))
2545  {
2547  "Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
2548  diff,
2549  (unsigned long long) op->initial_size);
2551  "# of full sends",
2552  1,
2553  GNUNET_NO);
2554  if (FULL_SYNC_LOCAL_SENDING_FIRST == op->mode_of_operation)
2555  {
2556  struct TransmitFullMessage *signal_msg;
2557  struct GNUNET_MQ_Envelope *ev;
2558  ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2560  signal_msg->remote_set_difference = htonl (diff_local);
2561  signal_msg->remote_set_size = htonl (op->local_element_count);
2562  signal_msg->local_set_difference = htonl (diff_remote);
2563  GNUNET_MQ_send (op->mq,
2564  ev);
2565  send_full_set (op);
2566  }
2567  else
2568  {
2569  struct GNUNET_MQ_Envelope *ev;
2570 
2572  "Telling other peer that we expect its full set\n");
2573  op->phase = PHASE_FULL_RECEIVING;
2574 #if MEASURE_PERFORMANCE
2575  perf_store.request_full.sent += 1;
2576 #endif
2577  struct TransmitFullMessage *signal_msg;
2578  ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2580  signal_msg->remote_set_difference = htonl (diff_local);
2581  signal_msg->remote_set_size = htonl (op->local_element_count);
2582  signal_msg->local_set_difference = htonl (diff_remote);
2583  GNUNET_MQ_send (op->mq,
2584  ev);
2585  }
2586  }
2587  else
2588  {
2590  "# of ibf sends",
2591  1,
2592  GNUNET_NO);
2593  if (GNUNET_OK !=
2594  send_ibf (op,
2596  op->ibf_number_buckets_per_element,
2597  op->ibf_bucket_number_factor)))
2598  {
2599  /* Internal error, best we can do is shut the connection */
2601  "Failed to send IBF, closing connection\n");
2603  return;
2604  }
2605  }
2606  GNUNET_CADET_receive_done (op->channel);
2607 }
2608 
2609 
2617 static int
2619  uint32_t key,
2620  void *value)
2621 {
2622  struct SendElementClosure *sec = cls;
2623  struct Operation *op = sec->op;
2624  struct KeyEntry *ke = value;
2625  struct GNUNET_MQ_Envelope *ev;
2626  struct GNUNET_MessageHeader *mh;
2627 
2628  /* Detect 32-bit key collision for the 64-bit IBF keys. */
2629  if (ke->ibf_key.key_val != sec->ibf_key.key_val)
2630  {
2631  op->active_passive_switch_required = true;
2632  return GNUNET_YES;
2633  }
2634 
2635  /* Prevent implementation from sending a offer multiple times in case of roll switch */
2636  if (GNUNET_YES ==
2638  op->message_control_flow,
2639  &ke->element->element_hash,
2640  OFFER_MESSAGE)
2641  )
2642  {
2644  "Skipping already sent processed element offer!\n");
2645  return GNUNET_YES;
2646  }
2647 
2648  /* Save send offer message for message control */
2649  if (GNUNET_YES !=
2651  op->message_control_flow,
2652  MSG_CFS_SENT,
2653  &ke->element->element_hash,
2654  OFFER_MESSAGE)
2655  )
2656  {
2658  "Double offer message sent found!\n");
2659  GNUNET_break (0);
2661  return GNUNET_NO;
2662  }
2663  ;
2664 
2665  /* Mark element to be expected to received */
2666  if (GNUNET_YES !=
2668  op->message_control_flow,
2670  &ke->element->element_hash,
2672  )
2673  {
2675  "Double demand received found!\n");
2676  GNUNET_break (0);
2678  return GNUNET_NO;
2679  }
2680  ;
2681 #if MEASURE_PERFORMANCE
2682  perf_store.offer.sent += 1;
2683  perf_store.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode);
2684 #endif
2686  sizeof(struct GNUNET_HashCode),
2688  GNUNET_assert (NULL != ev);
2689  *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
2691  "[OP %p] sending element offer (%s) to peer\n",
2692  op,
2693  GNUNET_h2s (&ke->element->element_hash));
2694  GNUNET_MQ_send (op->mq, ev);
2695  return GNUNET_YES;
2696 }
2697 
2698 
2705 void
2707  struct IBF_Key ibf_key)
2708 {
2709  struct SendElementClosure send_cls;
2710 
2711  send_cls.ibf_key = ibf_key;
2712  send_cls.op = op;
2714  op->key_to_element,
2715  (uint32_t) ibf_key.
2716  key_val,
2718  &send_cls);
2719 }
2720 
2721 
2729 static int
2731 {
2732  struct IBF_Key key;
2733  struct IBF_Key last_key;
2734  int side;
2735  unsigned int num_decoded;
2736  struct InvertibleBloomFilter *diff_ibf;
2737 
2739 
2740  if (GNUNET_OK !=
2741  prepare_ibf (op,
2742  op->remote_ibf->size))
2743  {
2744  GNUNET_break (0);
2745  /* allocation failed */
2746  return GNUNET_SYSERR;
2747  }
2748 
2749  diff_ibf = ibf_dup (op->local_ibf);
2750  ibf_subtract (diff_ibf,
2751  op->remote_ibf);
2752 
2753  ibf_destroy (op->remote_ibf);
2754  op->remote_ibf = NULL;
2755 
2757  "decoding IBF (size=%u)\n",
2758  diff_ibf->size);
2759 
2760  num_decoded = 0;
2761  key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
2762 
2763  while (1)
2764  {
2765  int res;
2766  int cycle_detected = GNUNET_NO;
2767 
2768  last_key = key;
2769 
2770  res = ibf_decode (diff_ibf,
2771  &side,
2772  &key);
2773  if (res == GNUNET_OK)
2774  {
2776  "decoded ibf key %lx\n",
2777  (unsigned long) key.key_val);
2778  num_decoded += 1;
2779  if ((num_decoded > diff_ibf->size) ||
2780  ((num_decoded > 1) &&
2781  (last_key.key_val == key.key_val)))
2782  {
2784  "detected cyclic ibf (decoded %u/%u)\n",
2785  num_decoded,
2786  diff_ibf->size);
2787  cycle_detected = GNUNET_YES;
2788  }
2789  }
2790  if ((GNUNET_SYSERR == res) ||
2791  (GNUNET_YES == cycle_detected))
2792  {
2793  uint32_t next_size;
2796  next_size = get_next_ibf_size (op->ibf_bucket_number_factor, num_decoded,
2797  diff_ibf->size);
2800  uint32_t ibf_min_size = IBF_MIN_SIZE | 1;
2801 
2802  if (next_size<ibf_min_size)
2803  next_size = ibf_min_size;
2804 
2805 
2806  if (next_size <= MAX_IBF_SIZE)
2807  {
2809  "decoding failed, sending larger ibf (size %u)\n",
2810  next_size);
2812  "# of IBF retries",
2813  1,
2814  GNUNET_NO);
2815 #if MEASURE_PERFORMANCE
2816  perf_store.active_passive_switches += 1;
2817 #endif
2818 
2819  op->salt_send = op->salt_receive++;
2820 
2821  if (GNUNET_OK !=
2822  send_ibf (op, next_size))
2823  {
2824  /* Internal error, best we can do is shut the connection */
2826  "Failed to send IBF, closing connection\n");
2828  ibf_destroy (diff_ibf);
2829  return GNUNET_SYSERR;
2830  }
2831  }
2832  else
2833  {
2835  "# of failed union operations (too large)",
2836  1,
2837  GNUNET_NO);
2838  // XXX: Send the whole set, element-by-element
2840  "set union failed: reached ibf limit\n");
2842  ibf_destroy (diff_ibf);
2843  return GNUNET_SYSERR;
2844  }
2845  break;
2846  }
2847  if (GNUNET_NO == res)
2848  {
2849  struct GNUNET_MQ_Envelope *ev;
2850 
2852  "transmitted all values, sending DONE\n");
2853 
2854 #if MEASURE_PERFORMANCE
2855  perf_store.done.sent += 1;
2856 #endif
2858  GNUNET_MQ_send (op->mq, ev);
2859  /* We now wait until we get a DONE message back
2860  * and then wait for our MQ to be flushed and all our
2861  * demands be delivered. */
2862  break;
2863  }
2864  if (1 == side)
2865  {
2866  struct IBF_Key unsalted_key;
2867  unsalt_key (&key,
2868  op->salt_receive,
2869  &unsalted_key);
2871  unsalted_key);
2872  }
2873  else if (-1 == side)
2874  {
2875  struct GNUNET_MQ_Envelope *ev;
2876  struct InquiryMessage *msg;
2877 
2878 #if MEASURE_PERFORMANCE
2879  perf_store.inquery.sent += 1;
2880  perf_store.inquery.sent_var_bytes += sizeof(struct IBF_Key);
2881 #endif
2882 
2884  struct GNUNET_HashContext *hashed_key_context =
2886  struct GNUNET_HashCode *hashed_key = (struct
2888  sizeof(struct GNUNET_HashCode));
2890  GNUNET_CRYPTO_hash_context_read (hashed_key_context,
2891  &key,
2892  sizeof(struct IBF_Key));
2893  GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
2894  hashed_key);
2895  GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
2896  hashed_key,
2897  &mcfs,
2899  );
2900 
2901  /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
2902  * the effort additional complexity. */
2903  ev = GNUNET_MQ_msg_extra (msg,
2904  sizeof(struct IBF_Key),
2906  msg->salt = htonl (op->salt_receive);
2907  GNUNET_memcpy (&msg[1],
2908  &key,
2909  sizeof(struct IBF_Key));
2911  "sending element inquiry for IBF key %lx\n",
2912  (unsigned long) key.key_val);
2913  GNUNET_MQ_send (op->mq, ev);
2914  }
2915  else
2916  {
2917  GNUNET_assert (0);
2918  }
2919  }
2920  ibf_destroy (diff_ibf);
2921  return GNUNET_OK;
2922 }
2923 
2924 
2932 static int
2934  const struct TransmitFullMessage *msg)
2935 {
2936  return GNUNET_OK;
2937 }
2938 
2939 
2946 static void
2948  const struct TransmitFullMessage *msg)
2949 {
2950  struct Operation *op = cls;
2951 
2955  uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
2956  if (GNUNET_OK !=
2957  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2958  {
2959  GNUNET_break (0);
2961  return;
2962  }
2963 
2965  op->remote_element_count = ntohl (msg->remote_set_size);
2966  op->remote_set_diff = ntohl (msg->remote_set_difference);
2967  op->local_set_diff = ntohl (msg->local_set_difference);
2968 
2971  {
2973  "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
2974  "criteria\n");
2975  GNUNET_break_op (0);
2977  return;
2978  }
2979 
2981  op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
2982  op->set->content->elements);
2983  uint64_t avg_element_size = 0;
2984  if (0 < op->local_element_count)
2985  {
2986  op->total_elements_size_local = 0;
2987  GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2988  &
2990  op);
2991  avg_element_size = op->total_elements_size_local / op->local_element_count;
2992  }
2993 
2995  int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2996  op->
2998  op->
3000  op->local_set_diff,
3001  op->remote_set_diff,
3002  op->
3004  op->
3007  {
3009  "PROTOCOL VIOLATION: Remote peer choose to send his full set first but correct mode would have been"
3010  " : %d\n", mode_of_operation);
3011  GNUNET_break_op (0);
3013  return;
3014  }
3015  op->phase = PHASE_FULL_RECEIVING;
3016 }
3017 
3018 
3029 static int
3031  const struct IBFMessage *msg)
3032 {
3033  struct Operation *op = cls;
3034  unsigned int buckets_in_message;
3035 
3036  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
3037  / IBF_BUCKET_SIZE;
3038  if (0 == buckets_in_message)
3039  {
3040  GNUNET_break_op (0);
3041  return GNUNET_SYSERR;
3042  }
3043  if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
3044  * IBF_BUCKET_SIZE)
3045  {
3046  GNUNET_break_op (0);
3047  return GNUNET_SYSERR;
3048  }
3049  if (op->phase == PHASE_EXPECT_IBF_LAST)
3050  {
3051  if (ntohl (msg->offset) != op->ibf_buckets_received)
3052  {
3053  GNUNET_break_op (0);
3054  return GNUNET_SYSERR;
3055  }
3056 
3057  if (msg->ibf_size != op->remote_ibf->size)
3058  {
3059  GNUNET_break_op (0);
3060  return GNUNET_SYSERR;
3061  }
3062  if (ntohl (msg->salt) != op->salt_receive)
3063  {
3064  GNUNET_break_op (0);
3065  return GNUNET_SYSERR;
3066  }
3067  }
3068  else if ((op->phase != PHASE_PASSIVE_DECODING) &&
3069  (op->phase != PHASE_EXPECT_IBF))
3070  {
3071  GNUNET_break_op (0);
3072  return GNUNET_SYSERR;
3073  }
3074 
3075  return GNUNET_OK;
3076 }
3077 
3078 
3088 static void
3090  const struct IBFMessage *msg)
3091 {
3092  struct Operation *op = cls;
3093  unsigned int buckets_in_message;
3097  uint8_t allowed_phases[] = {PHASE_EXPECT_IBF, PHASE_EXPECT_IBF_LAST,
3099  if (GNUNET_OK !=
3100  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3101  {
3102  GNUNET_break (0);
3104  return;
3105  }
3106  op->differential_sync_iterations++;
3108  op->active_passive_switch_required = false;
3109 
3110 #if MEASURE_PERFORMANCE
3111  perf_store.ibf.received += 1;
3112  perf_store.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg);
3113 #endif
3114 
3115  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
3116  / IBF_BUCKET_SIZE;
3117  if ((op->phase == PHASE_PASSIVE_DECODING) ||
3118  (op->phase == PHASE_EXPECT_IBF))
3119  {
3120  op->phase = PHASE_EXPECT_IBF_LAST;
3121  GNUNET_assert (NULL == op->remote_ibf);
3123  "Creating new ibf of size %u\n",
3124  ntohl (msg->ibf_size));
3125  // op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
3126  op->remote_ibf = ibf_create (msg->ibf_size,
3127  ((uint8_t) op->ibf_number_buckets_per_element));
3128  op->salt_receive = ntohl (msg->salt);
3130  "Receiving new IBF with salt %u\n",
3131  op->salt_receive);
3132  if (NULL == op->remote_ibf)
3133  {
3135  "Failed to parse remote IBF, closing connection\n");
3137  return;
3138  }
3139  op->ibf_buckets_received = 0;
3140  if (0 != ntohl (msg->offset))
3141  {
3142  GNUNET_break_op (0);
3144  return;
3145  }
3146  }
3147  else
3148  {
3151  "Received more of IBF\n");
3152  }
3153  GNUNET_assert (NULL != op->remote_ibf);
3154 
3155  ibf_read_slice (&msg[1],
3156  op->ibf_buckets_received,
3157  buckets_in_message,
3158  op->remote_ibf, msg->ibf_counter_bit_length);
3159  op->ibf_buckets_received += buckets_in_message;
3160 
3161  if (op->ibf_buckets_received == op->remote_ibf->size)
3162  {
3164  "received full ibf\n");
3165  op->phase = PHASE_ACTIVE_DECODING;
3166  if (GNUNET_OK !=
3167  decode_and_send (op))
3168  {
3169  /* Internal error, best we can do is shut down */
3171  "Failed to decode IBF, closing connection\n");
3173  return;
3174  }
3175  }
3176  GNUNET_CADET_receive_done (op->channel);
3177 }
3178 
3179 
3188 static void
3190  const struct GNUNET_SETU_Element *element,
3192 {
3193  struct GNUNET_MQ_Envelope *ev;
3194  struct GNUNET_SETU_ResultMessage *rm;
3195 
3197  "sending element (size %u) to client\n",
3198  element->size);
3199  GNUNET_assert (0 != op->client_request_id);
3200  ev = GNUNET_MQ_msg_extra (rm,
3201  element->size,
3203  if (NULL == ev)
3204  {
3205  GNUNET_MQ_discard (ev);
3206  GNUNET_break (0);
3207  return;
3208  }
3209  rm->result_status = htons (status);
3210  rm->request_id = htonl (op->client_request_id);
3211  rm->element_type = htons (element->element_type);
3213  op->key_to_element));
3214  GNUNET_memcpy (&rm[1],
3215  element->data,
3216  element->size);
3217  GNUNET_MQ_send (op->set->cs->mq,
3218  ev);
3219 }
3220 
3221 
3227 static void
3229 {
3230  unsigned int num_demanded;
3231 
3232  num_demanded = GNUNET_CONTAINER_multihashmap_size (
3233  op->demanded_hashes);
3234  int send_done = GNUNET_CONTAINER_multihashmap_iterate (
3235  op->message_control_flow,
3236  &
3238  op);
3239  if (PHASE_FINISH_WAITING == op->phase)
3240  {
3242  "In PHASE_FINISH_WAITING, pending %u demands -> %d\n",
3243  num_demanded, op->peer_site);
3244  if (-1 != send_done)
3245  {
3246  struct GNUNET_MQ_Envelope *ev;
3247 
3248  op->phase = PHASE_FINISHED;
3249 #if MEASURE_PERFORMANCE
3250  perf_store.done.sent += 1;
3251 #endif
3253  GNUNET_MQ_send (op->mq,
3254  ev);
3255  /* We now wait until the other peer sends P2P_OVER
3256  * after it got all elements from us. */
3257  }
3258  }
3259  if (PHASE_FINISH_CLOSING == op->phase)
3260  {
3262  "In PHASE_FINISH_CLOSING, pending %u demands %d\n",
3263  num_demanded, op->peer_site);
3264  if (-1 != send_done)
3265  {
3266  op->phase = PHASE_FINISHED;
3267  send_client_done (op);
3269  }
3270  }
3271 }
3272 
3273 
3280 static int
3282  const struct GNUNET_SETU_ElementMessage *emsg)
3283 {
3284  struct Operation *op = cls;
3285 
3286  if (0 == GNUNET_CONTAINER_multihashmap_size (op->demanded_hashes))
3287  {
3288  GNUNET_break_op (0);
3289  return GNUNET_SYSERR;
3290  }
3291  return GNUNET_OK;
3292 }
3293 
3294 
3303 static void
3305  const struct GNUNET_SETU_ElementMessage *emsg)
3306 {
3307  struct Operation *op = cls;
3308  struct ElementEntry *ee;
3309  struct KeyEntry *ke;
3310  uint16_t element_size;
3311 
3315  uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3317  if (GNUNET_OK !=
3318  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3319  {
3320  GNUNET_break (0);
3322  return;
3323  }
3324 
3325  element_size = ntohs (emsg->header.size) - sizeof(struct
3327 #if MEASURE_PERFORMANCE
3328  perf_store.element.received += 1;
3329  perf_store.element.received_var_bytes += element_size;
3330 #endif
3331 
3332  ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
3333  GNUNET_memcpy (&ee[1],
3334  &emsg[1],
3335  element_size);
3336  ee->element.size = element_size;
3337  ee->element.data = &ee[1];
3338  ee->element.element_type = ntohs (emsg->element_type);
3339  ee->remote = GNUNET_YES;
3341  &ee->element_hash);
3342  if (GNUNET_NO ==
3343  GNUNET_CONTAINER_multihashmap_remove (op->demanded_hashes,
3344  &ee->element_hash,
3345  NULL))
3346  {
3347  /* We got something we didn't demand, since it's not in our map. */
3348  GNUNET_break_op (0);
3350  return;
3351  }
3352 
3353  if (GNUNET_OK !=
3355  op->message_control_flow,
3357  &ee->element_hash,
3359  )
3360  {
3362  "An element has been received more than once!\n");
3363  GNUNET_break (0);
3365  return;
3366  }
3367 
3369  "Got element (size %u, hash %s) from peer\n",
3370  (unsigned int) element_size,
3371  GNUNET_h2s (&ee->element_hash));
3372 
3374  "# received elements",
3375  1,
3376  GNUNET_NO);
3378  "# exchanged elements",
3379  1,
3380  GNUNET_NO);
3381 
3382  op->received_total++;
3383 
3384  ke = op_get_element (op,
3385  &ee->element_hash);
3386  if (NULL != ke)
3387  {
3388  /* Got repeated element. Should not happen since
3389  * we track demands. */
3391  "# repeated elements",
3392  1,
3393  GNUNET_NO);
3394  ke->received = GNUNET_YES;
3395  GNUNET_free (ee);
3396  }
3397  else
3398  {
3400  "Registering new element from remote peer\n");
3401  op->received_fresh++;
3403  /* only send results immediately if the client wants it */
3405  &ee->element,
3407  }
3408 
3409  if ((op->received_total > 8) &&
3410  (op->received_fresh < op->received_total / 3))
3411  {
3412  /* The other peer gave us lots of old elements, there's something wrong. */
3413  GNUNET_break_op (0);
3415  return;
3416  }
3417  GNUNET_CADET_receive_done (op->channel);
3418  maybe_finish (op);
3419 }
3420 
3421 
3428 static int
3430  const struct GNUNET_SETU_ElementMessage *emsg)
3431 {
3432  struct Operation *op = cls;
3433 
3434  (void) op;
3435 
3436  // FIXME: check that we expect full elements here?
3437  return GNUNET_OK;
3438 }
3439 
3440 
3447 static void
3449  const struct GNUNET_SETU_ElementMessage *emsg)
3450 {
3451  struct Operation *op = cls;
3452  struct ElementEntry *ee;
3453  struct KeyEntry *ke;
3454  uint16_t element_size;
3455 
3459  uint8_t allowed_phases[] = {PHASE_FULL_RECEIVING, PHASE_FULL_SENDING};
3460  if (GNUNET_OK !=
3461  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3462  {
3463  GNUNET_break (0);
3465  return;
3466  }
3467 
3468  element_size = ntohs (emsg->header.size)
3469  - sizeof(struct GNUNET_SETU_ElementMessage);
3470 
3471 #if MEASURE_PERFORMANCE
3472  perf_store.element_full.received += 1;
3473  perf_store.element_full.received_var_bytes += element_size;
3474 #endif
3475 
3476  ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
3477  GNUNET_memcpy (&ee[1], &emsg[1], element_size);
3478  ee->element.size = element_size;
3479  ee->element.data = &ee[1];
3480  ee->element.element_type = ntohs (emsg->element_type);
3481  ee->remote = GNUNET_YES;
3483  &ee->element_hash);
3485  "Got element (full diff, size %u, hash %s) from peer\n",
3486  (unsigned int) element_size,
3487  GNUNET_h2s (&ee->element_hash));
3488 
3490  "# received elements",
3491  1,
3492  GNUNET_NO);
3494  "# exchanged elements",
3495  1,
3496  GNUNET_NO);
3497 
3498  op->received_total++;
3499  ke = op_get_element (op,
3500  &ee->element_hash);
3501  if (NULL != ke)
3502  {
3504  "# repeated elements",
3505  1,
3506  GNUNET_NO);
3508  ke->received = GNUNET_YES;
3509  GNUNET_free (ee);
3510  }
3511  else
3512  {
3514  "Registering new element from remote peer\n");
3515  op->received_fresh++;
3517  /* only send results immediately if the client wants it */
3519  &ee->element,
3521  }
3522 
3523 
3524  if ((GNUNET_YES == op->byzantine) &&
3525  (op->received_total > op->remote_element_count) )
3526  {
3527  /* The other peer gave us lots of old elements, there's something wrong. */
3529  "Other peer sent %llu elements while pretending to have %llu elements, failing operation\n",
3530  (unsigned long long) op->received_total,
3531  (unsigned long long) op->remote_element_count);
3532  GNUNET_break_op (0);
3534  return;
3535  }
3536  GNUNET_CADET_receive_done (op->channel);
3537 }
3538 
3539 
3547 static int
3549  const struct InquiryMessage *msg)
3550 {
3551  struct Operation *op = cls;
3552  unsigned int num_keys;
3553 
3554  if (op->phase != PHASE_PASSIVE_DECODING)
3555  {
3556  GNUNET_break_op (0);
3557  return GNUNET_SYSERR;
3558  }
3559  num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3560  / sizeof(struct IBF_Key);
3561  if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3562  != num_keys * sizeof(struct IBF_Key))
3563  {
3564  GNUNET_break_op (0);
3565  return GNUNET_SYSERR;
3566  }
3567  return GNUNET_OK;
3568 }
3569 
3570 
3577 static void
3579  const struct InquiryMessage *msg)
3580 {
3581  struct Operation *op = cls;
3582  const struct IBF_Key *ibf_key;
3583  unsigned int num_keys;
3584 
3588  uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
3589  if (GNUNET_OK !=
3590  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3591  {
3592  GNUNET_break (0);
3594  return;
3595  }
3596 
3597 #if MEASURE_PERFORMANCE
3598  perf_store.inquery.received += 1;
3599  perf_store.inquery.received_var_bytes += (ntohs (msg->header.size)
3600  - sizeof(struct InquiryMessage));
3601 #endif
3602 
3604  "Received union inquiry\n");
3605  num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3606  / sizeof(struct IBF_Key);
3607  ibf_key = (const struct IBF_Key *) &msg[1];
3608 
3610  struct GNUNET_HashContext *hashed_key_context =
3612  struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*) GNUNET_malloc (
3613  sizeof(struct GNUNET_HashCode));;
3615  GNUNET_CRYPTO_hash_context_read (hashed_key_context,
3616  &ibf_key,
3617  sizeof(struct IBF_Key));
3618  GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
3619  hashed_key);
3620  GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
3621  hashed_key,
3622  &mcfs,
3624  );
3625 
3626  while (0 != num_keys--)
3627  {
3628  struct IBF_Key unsalted_key;
3629  unsalt_key (ibf_key,
3630  ntohl (msg->salt),
3631  &unsalted_key);
3633  unsalted_key);
3634  ibf_key++;
3635  }
3636  GNUNET_CADET_receive_done (op->channel);
3637 }
3638 
3639 
3650 static int
3652  uint32_t key,
3653  void *value)
3654 {
3655  struct Operation *op = cls;
3656  struct KeyEntry *ke = value;
3657  struct GNUNET_MQ_Envelope *ev;
3658  struct GNUNET_SETU_ElementMessage *emsg;
3659  struct ElementEntry *ee = ke->element;
3660 
3661  if (GNUNET_YES == ke->received)
3662  return GNUNET_YES;
3663 #if MEASURE_PERFORMANCE
3664  perf_store.element_full.received += 1;
3665 #endif
3666  ev = GNUNET_MQ_msg_extra (emsg,
3667  ee->element.size,
3669  GNUNET_memcpy (&emsg[1],
3670  ee->element.data,
3671  ee->element.size);
3672  emsg->element_type = htons (ee->element.element_type);
3673  GNUNET_MQ_send (op->mq,
3674  ev);
3675  return GNUNET_YES;
3676 }
3677 
3678 
3685 static int
3687  const struct TransmitFullMessage *mh)
3688 {
3689  return GNUNET_OK;
3690 }
3691 
3692 
3693 static void
3695  const struct TransmitFullMessage *msg)
3696 {
3697  struct Operation *op = cls;
3698 
3702  uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
3703  if (GNUNET_OK !=
3704  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3705  {
3706  GNUNET_break (0);
3708  return;
3709  }
3710 
3711  op->remote_element_count = ntohl (msg->remote_set_size);
3712  op->remote_set_diff = ntohl (msg->remote_set_difference);
3713  op->local_set_diff = ntohl (msg->local_set_difference);
3714 
3715 
3717  {
3719  "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
3720  "criteria\n");
3721  GNUNET_break_op (0);
3723  return;
3724  }
3725 
3726 #if MEASURE_PERFORMANCE
3727  perf_store.request_full.received += 1;
3728 #endif
3729 
3731  "Received request for full set transmission\n");
3732 
3734  op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
3735  op->set->content->elements);
3736  uint64_t avg_element_size = 0;
3737  if (0 < op->local_element_count)
3738  {
3739  op->total_elements_size_local = 0;
3740  GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
3741  &
3743  op);
3744  avg_element_size = op->total_elements_size_local / op->local_element_count;
3745  }
3746 
3747  int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
3748  op->
3750  op->
3752  op->local_set_diff,
3753  op->remote_set_diff,
3754  op->
3756  op->
3759  {
3761  "PROTOCOL VIOLATION: Remote peer choose to request the full set first but correct mode would have been"
3762  " : %d\n", mode_of_operation);
3763  GNUNET_break_op (0);
3765  return;
3766  }
3767 
3768  // FIXME: we need to check that our set is larger than the
3769  // byzantine_lower_bound by some threshold
3770  send_full_set (op);
3771  GNUNET_CADET_receive_done (op->channel);
3772 }
3773 
3774 
3781 static void
3783  const struct GNUNET_MessageHeader *mh)
3784 {
3785  struct Operation *op = cls;
3786 
3790  uint8_t allowed_phases[] = {PHASE_FULL_SENDING, PHASE_FULL_RECEIVING};
3791  if (GNUNET_OK !=
3792  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3793  {
3794  GNUNET_break (0);
3796  return;
3797  }
3798 
3799 #if MEASURE_PERFORMANCE
3800  perf_store.full_done.received += 1;
3801 #endif
3802 
3803  switch (op->phase)
3804  {
3805  case PHASE_FULL_RECEIVING:
3806  {
3807  struct GNUNET_MQ_Envelope *ev;
3808 
3809  if ((GNUNET_YES == op->byzantine) &&
3810  (op->received_total != op->remote_element_count) )
3811  {
3812  /* The other peer gave not enough elements before sending full done, there's something wrong. */
3814  "Other peer sent only %llu/%llu fresh elements, failing operation\n",
3815  (unsigned long long) op->received_total,
3816  (unsigned long long) op->remote_element_count);
3817  GNUNET_break_op (0);
3819  return;
3820  }
3821 
3823  "got FULL DONE, sending elements that other peer is missing\n");
3824 
3825  /* send all the elements that did not come from the remote peer */
3828  op);
3829 #if MEASURE_PERFORMANCE
3830  perf_store.full_done.sent += 1;
3831 #endif
3833  GNUNET_MQ_send (op->mq,
3834  ev);
3835  op->phase = PHASE_FINISHED;
3836  /* we now wait until the other peer sends us the OVER message*/
3837  }
3838  break;
3839 
3840  case PHASE_FULL_SENDING:
3841  {
3843  "got FULL DONE, finishing\n");
3844  /* We sent the full set, and got the response for that. We're done. */
3845  op->phase = PHASE_FINISHED;
3846  GNUNET_CADET_receive_done (op->channel);
3847  send_client_done (op);
3849  return;
3850  }
3851 
3852  default:
3854  "Handle full done phase is %u\n",
3855  (unsigned) op->phase);
3856  GNUNET_break_op (0);
3858  return;
3859  }
3860  GNUNET_CADET_receive_done (op->channel);
3861 }
3862 
3863 
3872 static int
3874  const struct GNUNET_MessageHeader *mh)
3875 {
3876  struct Operation *op = cls;
3877  unsigned int num_hashes;
3878 
3879  (void) op;
3880  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3881  / sizeof(struct GNUNET_HashCode);
3882  if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3883  != num_hashes * sizeof(struct GNUNET_HashCode))
3884  {
3885  GNUNET_break_op (0);
3886  return GNUNET_SYSERR;
3887  }
3888  return GNUNET_OK;
3889 }
3890 
3891 
3899 static void
3901  const struct GNUNET_MessageHeader *mh)
3902 {
3903  struct Operation *op = cls;
3904  struct ElementEntry *ee;
3905  struct GNUNET_SETU_ElementMessage *emsg;
3906  const struct GNUNET_HashCode *hash;
3907  unsigned int num_hashes;
3908  struct GNUNET_MQ_Envelope *ev;
3909 
3913  uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3915  if (GNUNET_OK !=
3916  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3917  {
3918  GNUNET_break (0);
3920  return;
3921  }
3922 #if MEASURE_PERFORMANCE
3923  perf_store.demand.received += 1;
3924  perf_store.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct
3926 #endif
3927 
3928  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3929  / sizeof(struct GNUNET_HashCode);
3930  for (hash = (const struct GNUNET_HashCode *) &mh[1];
3931  num_hashes > 0;
3932  hash++, num_hashes--)
3933  {
3934  ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
3935  hash);
3936  if (NULL == ee)
3937  {
3938  /* Demand for non-existing element. */
3939  GNUNET_break_op (0);
3941  return;
3942  }
3943 
3944  /* Save send demand message for message control */
3945  if (GNUNET_YES !=
3947  op->message_control_flow,
3949  &ee->element_hash,
3951  )
3952  {
3954  "Double demand message received found!\n");
3955  GNUNET_break (0);
3957  return;
3958  }
3959  ;
3960 
3961  /* Mark element to be expected to received */
3962  if (GNUNET_YES !=
3964  op->message_control_flow,
3965  MSG_CFS_SENT,
3966  &ee->element_hash,
3968  )
3969  {
3971  "Double element message sent found!\n");
3972  GNUNET_break (0);
3974  return;
3975  }
3977  {
3978  /* Probably confused lazily copied sets. */
3979  GNUNET_break_op (0);
3981  return;
3982  }
3983 #if MEASURE_PERFORMANCE
3984  perf_store.element.sent += 1;
3985  perf_store.element.sent_var_bytes += ee->element.size;
3986 #endif
3987  ev = GNUNET_MQ_msg_extra (emsg,
3988  ee->element.size,
3990  GNUNET_memcpy (&emsg[1],
3991  ee->element.data,
3992  ee->element.size);
3993  emsg->reserved = htons (0);
3994  emsg->element_type = htons (ee->element.element_type);
3996  "[OP %p] Sending demanded element (size %u, hash %s) to peer\n",
3997  op,
3998  (unsigned int) ee->element.size,
3999  GNUNET_h2s (&ee->element_hash));
4000  GNUNET_MQ_send (op->mq, ev);
4002  "# exchanged elements",
4003  1,
4004  GNUNET_NO);
4005  if (op->symmetric)
4007  &ee->element,
4009  }
4010  GNUNET_CADET_receive_done (op->channel);
4011  maybe_finish (op);
4012 }
4013 
4014 
4022 static int
4024  const struct GNUNET_MessageHeader *mh)
4025 {
4026  struct Operation *op = cls;
4027  unsigned int num_hashes;
4028 
4029  /* look up elements and send them */
4030  if ((op->phase != PHASE_PASSIVE_DECODING) &&
4031  (op->phase != PHASE_ACTIVE_DECODING))
4032  {
4033  GNUNET_break_op (0);
4034  return GNUNET_SYSERR;
4035  }
4036  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
4037  / sizeof(struct GNUNET_HashCode);
4038  if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
4039  num_hashes * sizeof(struct GNUNET_HashCode))
4040  {
4041  GNUNET_break_op (0);
4042  return GNUNET_SYSERR;
4043  }
4044  return GNUNET_OK;
4045 }
4046 
4047 
4055 static void
4057  const struct GNUNET_MessageHeader *mh)
4058 {
4059  struct Operation *op = cls;
4060  const struct GNUNET_HashCode *hash;
4061  unsigned int num_hashes;
4065  uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4066  if (GNUNET_OK !=
4067  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4068  {
4069  GNUNET_break (0);
4071  return;
4072  }
4073 
4074 #if MEASURE_PERFORMANCE
4075  perf_store.offer.received += 1;
4076  perf_store.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct
4078 #endif
4079 
4080  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
4081  / sizeof(struct GNUNET_HashCode);
4082  for (hash = (const struct GNUNET_HashCode *) &mh[1];
4083  num_hashes > 0;
4084  hash++, num_hashes--)
4085  {
4086  struct ElementEntry *ee;
4087  struct GNUNET_MessageHeader *demands;
4088  struct GNUNET_MQ_Envelope *ev;
4089 
4090  ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
4091  hash);
4092  if (NULL != ee)
4094  continue;
4095 
4096  if (GNUNET_YES ==
4097  GNUNET_CONTAINER_multihashmap_contains (op->demanded_hashes,
4098  hash))
4099  {
4101  "Skipped sending duplicate demand\n");
4102  continue;
4103  }
4104 
4107  op->demanded_hashes,
4108  hash,
4109  NULL,
4111 
4113  "[OP %p] Requesting element (hash %s)\n",
4114  op, GNUNET_h2s (hash));
4115 
4116 #if MEASURE_PERFORMANCE
4117  perf_store.demand.sent += 1;
4118  perf_store.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode);
4119 #endif
4120  /* Save send demand message for message control */
4121  if (GNUNET_YES !=
4123  op->message_control_flow,
4124  MSG_CFS_SENT,
4125  hash,
4126  DEMAND_MESSAGE))
4127  {
4129  "Double demand message sent found!\n");
4130  GNUNET_break (0);
4132  return;
4133  }
4134 
4135  /* Mark offer as received received */
4136  if (GNUNET_YES !=
4138  op->message_control_flow,
4140  hash,
4141  OFFER_MESSAGE))
4142  {
4144  "Double offer message received found!\n");
4145  GNUNET_break (0);
4147  return;
4148  }
4149  /* Mark element to be expected to received */
4150  if (GNUNET_YES !=
4152  op->message_control_flow,
4154  hash,
4155  ELEMENT_MESSAGE))
4156  {
4158  "Element already expected!\n");
4159  GNUNET_break (0);
4161  return;
4162  }
4163  ev = GNUNET_MQ_msg_header_extra (demands,
4164  sizeof(struct GNUNET_HashCode),
4166  GNUNET_memcpy (&demands[1],
4167  hash,
4168  sizeof(struct GNUNET_HashCode));
4169  GNUNET_MQ_send (op->mq, ev);
4170  }
4171  GNUNET_CADET_receive_done (op->channel);
4172 }
4173 
4174 
4181 static void
4183  const struct GNUNET_MessageHeader *mh)
4184 {
4185  struct Operation *op = cls;
4186 
4190  uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4191  if (GNUNET_OK !=
4192  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4193  {
4194  GNUNET_break (0);
4196  return;
4197  }
4198 
4199  if (op->active_passive_switch_required)
4200  {
4202  "PROTOCOL VIOLATION: Received done but role change is necessary\n");
4203  GNUNET_break (0);
4205  return;
4206  }
4207 
4208 #if MEASURE_PERFORMANCE
4209  perf_store.done.received += 1;
4210 #endif
4211  switch (op->phase)
4212  {
4214  /* We got all requests, but still have to send our elements in response. */
4215  op->phase = PHASE_FINISH_WAITING;
4217  "got DONE (as passive partner), waiting for our demands to be satisfied\n");
4218  /* The active peer is done sending offers
4219  * and inquiries. This means that all
4220  * our responses to that (demands and offers)
4221  * must be in flight (queued or in mesh).
4222  *
4223  * We should notify the active peer once
4224  * all our demands are satisfied, so that the active
4225  * peer can quit if we gave it everything.
4226  */GNUNET_CADET_receive_done (op->channel);
4227  maybe_finish (op);
4228  return;
4229  case PHASE_ACTIVE_DECODING:
4231  "got DONE (as active partner), waiting to finish\n");
4232  /* All demands of the other peer are satisfied,
4233  * and we processed all offers, thus we know
4234  * exactly what our demands must be.
4235  *
4236  * We'll close the channel
4237  * to the other peer once our demands are met.
4238  */op->phase = PHASE_FINISH_CLOSING;
4239  GNUNET_CADET_receive_done (op->channel);
4240  maybe_finish (op);
4241  return;
4242  default:
4243  GNUNET_break_op (0);
4245  return;
4246  }
4247 }
4248 
4249 
4256 static void
4258  const struct GNUNET_MessageHeader *mh)
4259 {
4260 #if MEASURE_PERFORMANCE
4261  perf_store.over.received += 1;
4262 #endif
4263  send_client_done (cls);
4264 }
4265 
4266 
4275 static struct Operation *
4276 get_incoming (uint32_t id)
4277 {
4278  for (struct Listener *listener = listener_head;
4279  NULL != listener;
4280  listener = listener->next)
4281  {
4282  for (struct Operation *op = listener->op_head;
4283  NULL != op;
4284  op = op->next)
4285  if (op->suggest_id == id)
4286  return op;
4287  }
4288  return NULL;
4289 }
4290 
4291 
4300 static void *
4302  struct GNUNET_SERVICE_Client *c,
4303  struct GNUNET_MQ_Handle *mq)
4304 {
4305  struct ClientState *cs;
4306 
4307  num_clients++;
4308  cs = GNUNET_new (struct ClientState);
4309  cs->client = c;
4310  cs->mq = mq;
4311  return cs;
4312 }
4313 
4314 
4323 static int
4325  const struct GNUNET_HashCode *key,
4326  void *value)
4327 {
4328  struct ElementEntry *ee = value;
4329 
4330  GNUNET_free (ee);
4331  return GNUNET_YES;
4332 }
4333 
4334 
4342 static void
4344  struct GNUNET_SERVICE_Client *client,
4345  void *internal_cls)
4346 {
4347  struct ClientState *cs = internal_cls;
4348  struct Operation *op;
4349  struct Listener *listener;
4350  struct Set *set;
4351 
4353  "Client disconnected, cleaning up\n");
4354  if (NULL != (set = cs->set))
4355  {
4356  struct SetContent *content = set->content;
4357 
4359  "Destroying client's set\n");
4360  /* Destroy pending set operations */
4361  while (NULL != set->ops_head)
4363 
4364  /* Destroy operation-specific state */
4365  if (NULL != set->se)
4366  {
4368  set->se = NULL;
4369  }
4370  /* free set content (or at least decrement RC) */
4371  set->content = NULL;
4372  GNUNET_assert (0 != content->refcount);
4373  content->refcount--;
4374  if (0 == content->refcount)
4375  {
4376  GNUNET_assert (NULL != content->elements);
4379  NULL);
4381  content->elements = NULL;
4382  GNUNET_free (content);
4383  }
4384  GNUNET_free (set);
4385  }
4386 
4387  if (NULL != (listener = cs->listener))
4388  {
4390  "Destroying client's listener\n");
4391  GNUNET_CADET_close_port (listener->open_port);
4392  listener->open_port = NULL;
4393  while (NULL != (op = listener->op_head))
4394  {
4396  "Destroying incoming operation `%u' from peer `%s'\n",
4397  (unsigned int) op->client_request_id,
4398  GNUNET_i2s (&op->peer));
4399  incoming_destroy (op);
4400  }
4402  listener_tail,
4403  listener);
4404  GNUNET_free (listener);
4405  }
4406  GNUNET_free (cs);
4407  num_clients--;
4408  if ( (GNUNET_YES == in_shutdown) &&
4409  (0 == num_clients) )
4410  {
4411  if (NULL != cadet)
4412  {
4414  cadet = NULL;
4415  }
4416  }
4417 }
4418 
4419 
4428 static int
4430  const struct OperationRequestMessage *msg)
4431 {
4432  struct Operation *op = cls;
4433  struct Listener *listener = op->listener;
4434  const struct GNUNET_MessageHeader *nested_context;
4435 
4436  /* double operation request */
4437  if (0 != op->suggest_id)
4438  {
4439  GNUNET_break_op (0);
4440  return GNUNET_SYSERR;
4441  }
4442  /* This should be equivalent to the previous condition, but can't hurt to check twice */
4443  if (NULL == listener)
4444  {
4445  GNUNET_break (0);
4446  return GNUNET_SYSERR;
4447  }
4448  nested_context = GNUNET_MQ_extract_nested_mh (msg);
4449  if ((NULL != nested_context) &&
4450  (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
4451  {
4452  GNUNET_break_op (0);
4453  return GNUNET_SYSERR;
4454  }
4455  return GNUNET_OK;
4456 }
4457 
4458 
4474 static void
4476  const struct OperationRequestMessage *msg)
4477 {
4478  struct Operation *op = cls;
4479  struct Listener *listener = op->listener;
4480  const struct GNUNET_MessageHeader *nested_context;
4481  struct GNUNET_MQ_Envelope *env;
4482  struct GNUNET_SETU_RequestMessage *cmsg;
4483 
4484  nested_context = GNUNET_MQ_extract_nested_mh (msg);
4485  /* Make a copy of the nested_context (application-specific context
4486  information that is opaque to set) so we can pass it to the
4487  listener later on */
4488  if (NULL != nested_context)
4489  op->context_msg = GNUNET_copy_message (nested_context);
4490  op->remote_element_count = ntohl (msg->element_count);
4491  GNUNET_log (
4493  "Received P2P operation request (port %s) for active listener\n",
4494  GNUNET_h2s (&op->listener->app_id));
4495  GNUNET_assert (0 == op->suggest_id);
4496  if (0 == suggest_id)
4497  suggest_id++;
4498  op->suggest_id = suggest_id++;
4499  GNUNET_assert (NULL != op->timeout_task);
4500  GNUNET_SCHEDULER_cancel (op->timeout_task);
4501  op->timeout_task = NULL;
4502  env = GNUNET_MQ_msg_nested_mh (cmsg,
4504  op->context_msg);
4505  GNUNET_log (
4507  "Suggesting incoming request with accept id %u to listener %p of client %p\n",
4508  op->suggest_id,
4509  listener,
4510  listener->cs);
4511  cmsg->accept_id = htonl (op->suggest_id);
4512  cmsg->peer_id = op->peer;
4513  GNUNET_MQ_send (listener->cs->mq,
4514  env);
4515  /* NOTE: GNUNET_CADET_receive_done() will be called in
4516  #handle_client_accept() */
4517 }
4518 
4519 
4528 static void
4530  const struct GNUNET_SETU_CreateMessage *msg)
4531 {
4532  struct ClientState *cs = cls;
4533  struct Set *set;
4534 
4536  "Client created new set for union operation\n");
4537  if (NULL != cs->set)
4538  {
4539  /* There can only be one set per client */
4540  GNUNET_break (0);
4542  return;
4543  }
4544  set = GNUNET_new (struct Set);
4545  {
4546  struct MultiStrataEstimator *se;
4547 
4550  SE_IBF_HASH_NUM);
4551  if (NULL == se)
4552  {
4554  "Failed to allocate strata estimator\n");
4555  GNUNET_free (set);
4557  return;
4558  }
4559  set->se = se;
4560  }
4561  set->content = GNUNET_new (struct SetContent);
4562  set->content->refcount = 1;
4564  GNUNET_YES);
4565  set->cs = cs;
4566  cs->set = set;
4568 }
4569 
4570 
4580 static void
4582 {
4583  struct Operation *op = cls;
4584 
4585  op->timeout_task = NULL;
4587  "Remote peer's incoming request timed out\n");
4588  incoming_destroy (op);
4589 }
4590 
4591 
4608 static void *
4609 channel_new_cb (void *cls,
4610  struct GNUNET_CADET_Channel *channel,
4611  const struct GNUNET_PeerIdentity *source)
4612 {
4613  struct Listener *listener = cls;
4614  struct Operation *op;
4615 
4617  "New incoming channel\n");
4618  op = GNUNET_new (struct Operation);
4619  op->listener = listener;
4620  op->peer = *source;
4621  op->channel = channel;
4622  op->mq = GNUNET_CADET_get_mq (op->channel);
4624  UINT32_MAX);
4627  op);
4629  listener->op_tail,
4630  op);
4631  return op;
4632 }
4633 
4634 
4651 static void
4652 channel_end_cb (void *channel_ctx,
4653  const struct GNUNET_CADET_Channel *channel)
4654 {
4655  struct Operation *op = channel_ctx;
4656 
4657  op->channel = NULL;
4659 }
4660 
4661 
4676 static void
4678  const struct GNUNET_CADET_Channel *channel,
4679  int window_size)
4680 {
4681  /* FIXME: not implemented, we could do flow control here... */
4682 }
4683 
4684 
4692 static void
4694  const struct GNUNET_SETU_ListenMessage *msg)
4695 {
4696  struct ClientState *cs = cls;
4697  struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4698  GNUNET_MQ_hd_var_size (incoming_msg,
4700  struct OperationRequestMessage,
4701  NULL),
4702  GNUNET_MQ_hd_var_size (union_p2p_ibf,
4704  struct IBFMessage,
4705  NULL),
4706  GNUNET_MQ_hd_var_size (union_p2p_elements,
4709  NULL),
4710  GNUNET_MQ_hd_var_size (union_p2p_offer,
4712  struct GNUNET_MessageHeader,
4713  NULL),
4714  GNUNET_MQ_hd_var_size (union_p2p_inquiry,
4716  struct InquiryMessage,
4717  NULL),
4718  GNUNET_MQ_hd_var_size (union_p2p_demand,
4720  struct GNUNET_MessageHeader,
4721  NULL),
4722  GNUNET_MQ_hd_fixed_size (union_p2p_done,
4724  struct GNUNET_MessageHeader,
4725  NULL),
4726  GNUNET_MQ_hd_fixed_size (union_p2p_over,
4728  struct GNUNET_MessageHeader,
4729  NULL),
4730  GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
4732  struct GNUNET_MessageHeader,
4733  NULL),
4734  GNUNET_MQ_hd_var_size (union_p2p_request_full,
4736  struct TransmitFullMessage,
4737  NULL),
4738  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4740  struct StrataEstimatorMessage,
4741  NULL),
4742  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4744  struct StrataEstimatorMessage,
4745  NULL),
4746  GNUNET_MQ_hd_var_size (union_p2p_full_element,
4749  NULL),
4750  GNUNET_MQ_hd_var_size (union_p2p_send_full,
4752  struct TransmitFullMessage,
4753  NULL),
4755  };
4756  struct Listener *listener;
4757 
4758  if (NULL != cs->listener)
4759  {
4760  /* max. one active listener per client! */
4761  GNUNET_break (0);
4763  return;
4764  }
4765  listener = GNUNET_new (struct Listener);
4766  listener->cs = cs;
4767  cs->listener = listener;
4768  listener->app_id = msg->app_id;
4770  listener_tail,
4771  listener);
4773  "New listener created (port %s)\n",
4774  GNUNET_h2s (&listener->app_id));
4775  listener->open_port = GNUNET_CADET_open_port (cadet,
4776  &msg->app_id,
4777  &channel_new_cb,
4778  listener,
4780  &channel_end_cb,
4781  cadet_handlers);
4783 }
4784 
4785 
4793 static void
4795  const struct GNUNET_SETU_RejectMessage *msg)
4796 {
4797  struct ClientState *cs = cls;
4798  struct Operation *op;
4799 
4800  op = get_incoming (ntohl (msg->accept_reject_id));
4801  if (NULL == op)
4802  {
4803  /* no matching incoming operation for this reject;
4804  could be that the other peer already disconnected... */
4806  "Client rejected unknown operation %u\n",
4807  (unsigned int) ntohl (msg->accept_reject_id));
4809  return;
4810  }
4812  "Peer request (app %s) rejected by client\n",
4813  GNUNET_h2s (&cs->listener->app_id));
4816 }
4817 
4818 
4825 static int
4827  const struct GNUNET_SETU_ElementMessage *msg)
4828 {
4829  /* NOTE: Technically, we should probably check with the
4830  block library whether the element we are given is well-formed */
4831  return GNUNET_OK;
4832 }
4833 
4834 
4841 static void
4843  const struct GNUNET_SETU_ElementMessage *msg)
4844 {
4845  struct ClientState *cs = cls;
4846  struct Set *set;
4847  struct GNUNET_SETU_Element el;
4848  struct ElementEntry *ee;
4849  struct GNUNET_HashCode hash;
4850 
4851  if (NULL == (set = cs->set))
4852  {
4853  /* client without a set requested an operation */
4854  GNUNET_break (0);
4856  return;
4857  }
4859  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
4860  el.size = ntohs (msg->header.size) - sizeof(*msg);
4861  el.data = &msg[1];
4862  el.element_type = ntohs (msg->element_type);
4864  &hash);
4866  &hash);
4867  if (NULL == ee)
4868  {
4870  "Client inserts element %s of size %u\n",
4871  GNUNET_h2s (&hash),
4872  el.size);
4873  ee = GNUNET_malloc (el.size + sizeof(*ee));
4874  ee->element.size = el.size;
4875  GNUNET_memcpy (&ee[1], el.data, el.size);
4876  ee->element.data = &ee[1];
4877  ee->element.element_type = el.element_type;
4878  ee->remote = GNUNET_NO;
4879  ee->generation = set->current_generation;
4880  ee->element_hash = hash;
4883  set->content->elements,
4884  &ee->element_hash,
4885  ee,
4887  }
4888  else
4889  {
4891  "Client inserted element %s of size %u twice (ignored)\n",
4892  GNUNET_h2s (&hash),
4893  el.size);
4894  /* same element inserted twice */
4895  return;
4896  }
4898  get_ibf_key (&ee->element_hash));
4899 }
4900 
4901 
4908 static void
4910 {
4911  set->content->latest_generation++;
4912  set->current_generation++;
4913 }
4914 
4915 
4925 static int
4927  const struct GNUNET_SETU_EvaluateMessage *msg)
4928 {
4929  /* FIXME: suboptimal, even if the context below could be NULL,
4930  there are malformed messages this does not check for... */
4931  return GNUNET_OK;
4932 }
4933 
4934 
4943 static void
4945  const struct GNUNET_SETU_EvaluateMessage *msg)
4946 {
4947  struct ClientState *cs = cls;
4948  struct Operation *op = GNUNET_new (struct Operation);
4949 
4950  const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4951  GNUNET_MQ_hd_var_size (incoming_msg,
4953  struct OperationRequestMessage,
4954  op),
4955  GNUNET_MQ_hd_var_size (union_p2p_ibf,
4957  struct IBFMessage,
4958  op),
4959  GNUNET_MQ_hd_var_size (union_p2p_elements,
4962  op),
4963  GNUNET_MQ_hd_var_size (union_p2p_offer,
4965  struct GNUNET_MessageHeader,
4966  op),
4967  GNUNET_MQ_hd_var_size (union_p2p_inquiry,
4969  struct InquiryMessage,
4970  op),
4971  GNUNET_MQ_hd_var_size (union_p2p_demand,
4973  struct GNUNET_MessageHeader,
4974  op),
4975  GNUNET_MQ_hd_fixed_size (union_p2p_done,
4977  struct GNUNET_MessageHeader,
4978  op),
4979  GNUNET_MQ_hd_fixed_size (union_p2p_over,
4981  struct GNUNET_MessageHeader,
4982  op),
4983  GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
4985  struct GNUNET_MessageHeader,
4986  op),
4987  GNUNET_MQ_hd_var_size (union_p2p_request_full,
4989  struct TransmitFullMessage,
4990  op),
4991  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4993  struct StrataEstimatorMessage,
4994  op),
4995  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4997  struct StrataEstimatorMessage,
4998  op),
4999  GNUNET_MQ_hd_var_size (union_p2p_full_element,
5002  op),
5003  GNUNET_MQ_hd_var_size (union_p2p_send_full,
5005  struct TransmitFullMessage,
5006  NULL),
5008  };
5009  struct Set *set;
5010  const struct GNUNET_MessageHeader *context;
5011 
5012  if (NULL == (set = cs->set))
5013  {
5014  GNUNET_break (0);
5015  GNUNET_free (op);
5017  return;
5018  }
5020  UINT32_MAX);
5021  op->peer = msg->target_peer;
5022  op->client_request_id = ntohl (msg->request_id);
5023  op->byzantine = msg->byzantine;
5024  op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
5025  op->force_full = msg->force_full;
5026  op->force_delta = msg->force_delta;
5027  op->symmetric = msg->symmetric;
5028  op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5029  op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5030  op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5031  op->byzantine_upper_bound = msg->byzantine_upper_bond;
5032  op->active_passive_switch_required = false;
5034 
5035  /* create hashmap for message control */
5036  op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5037  GNUNET_NO);
5038  op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5039 
5040 #if MEASURE_PERFORMANCE
5041  /* load config */
5042  load_config (op);
5043 #endif
5044 
5045  /* Advance generation values, so that
5046  mutations won't interfer with the running operation. */
5047  op->set = set;
5048  op->generation_created = set->current_generation;
5049  advance_generation (set);
5051  set->ops_tail,
5052  op);
5054  "Creating new CADET channel to port %s for set union\n",
5055  GNUNET_h2s (&msg->app_id));
5056  op->channel = GNUNET_CADET_channel_create (cadet,
5057  op,
5058  &msg->target_peer,
5059  &msg->app_id,
5061  &channel_end_cb,
5062  cadet_handlers);
5063  op->mq = GNUNET_CADET_get_mq (op->channel);
5064  {
5065  struct GNUNET_MQ_Envelope *ev;
5066  struct OperationRequestMessage *msg;
5067 
5068 #if MEASURE_PERFORMANCE
5069  perf_store.operation_request.sent += 1;
5070 #endif
5073  context);
5074  if (NULL == ev)
5075  {
5076  /* the context message is too large */
5077  GNUNET_break (0);
5079  return;
5080  }
5081  op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
5082  GNUNET_NO);
5083  /* copy the current generation's strata estimator for this operation */
5084  op->se = strata_estimator_dup (op->set->se);
5085  /* we started the operation, thus we have to send the operation request */
5086  op->phase = PHASE_EXPECT_SE;
5087 
5088  op->salt_receive = (op->peer_site + 1) % 2;
5089  op->salt_send = op->peer_site; // FIXME?????
5090 
5091 
5093  "Initiating union operation evaluation\n");
5095  "# of total union operations",
5096  1,
5097  GNUNET_NO);
5099  "# of initiated union operations",
5100  1,
5101  GNUNET_NO);
5102  GNUNET_MQ_send (op->mq,
5103  ev);
5104  if (NULL != context)
5106  "sent op request with context message\n");
5107  else
5109  "sent op request without context message\n");
5111  op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
5112  op->key_to_element);
5113 
5114  }
5116 }
5117 
5118 
5125 static void
5127  const struct GNUNET_SETU_CancelMessage *msg)
5128 {
5129  struct ClientState *cs = cls;
5130  struct Set *set;
5131  struct Operation *op;
5132  int found;
5133 
5134  if (NULL == (set = cs->set))
5135  {
5136  /* client without a set requested an operation */
5137  GNUNET_break (0);
5139  return;
5140  }
5141  found = GNUNET_NO;
5142  for (op = set->ops_head; NULL != op; op = op->next)
5143  {
5144  if (op->client_request_id == ntohl (msg->request_id))
5145  {
5146  found = GNUNET_YES;
5147  break;
5148  }
5149  }
5150  if (GNUNET_NO == found)
5151  {
5152  /* It may happen that the operation was already destroyed due to
5153  * the other peer disconnecting. The client may not know about this
5154  * yet and try to cancel the (just barely non-existent) operation.
5155  * So this is not a hard error.
5156  *///
5158  "Client canceled non-existent op %u\n",
5159  (uint32_t) ntohl (msg->request_id));
5160  }
5161  else
5162  {
5164  "Client requested cancel for op %u\n",
5165  (uint32_t) ntohl (msg->request_id));
5167  }
5169 }
5170 
5171 
5180 static void
5182  const struct GNUNET_SETU_AcceptMessage *msg)
5183 {
5184  struct ClientState *cs = cls;
5185  struct Set *set;
5186  struct Operation *op;
5187  struct GNUNET_SETU_ResultMessage *result_message;
5188  struct GNUNET_MQ_Envelope *ev;
5189  struct Listener *listener;
5190 
5191  if (NULL == (set = cs->set))
5192  {
5193  /* client without a set requested to accept */
5194  GNUNET_break (0);
5196  return;
5197  }
5198  op = get_incoming (ntohl (msg->accept_reject_id));
5199  if (NULL == op)
5200  {
5201  /* It is not an error if the set op does not exist -- it may
5202  * have been destroyed when the partner peer disconnected. */
5203  GNUNET_log (
5205  "Client %p accepted request %u of listener %p that is no longer active\n",
5206  cs,
5207  ntohl (msg->accept_reject_id),
5208  cs->listener);
5209  ev = GNUNET_MQ_msg (result_message,
5211  result_message->request_id = msg->request_id;
5212  result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
5213  GNUNET_MQ_send (set->cs->mq, ev);
5215  return;
5216  }
5218  "Client accepting request %u\n",
5219  (uint32_t) ntohl (msg->accept_reject_id));
5220  listener = op->listener;
5221  op->listener = NULL;
5223  listener->op_tail,
5224  op);
5225  op->set = set;
5227  set->ops_tail,
5228  op);
5229  op->client_request_id = ntohl (msg->request_id);
5230  op->byzantine = msg->byzantine;
5231  op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
5232  op->force_full = msg->force_full;
5233  op->force_delta = msg->force_delta;
5234  op->symmetric = msg->symmetric;
5235  op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5236  op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5237  op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5238  op->byzantine_upper_bound = msg->byzantine_upper_bond;
5239  op->active_passive_switch_required = false;
5240  /* create hashmap for message control */
5241  op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5242  GNUNET_NO);
5243  op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5244 
5245 #if MEASURE_PERFORMANCE
5246  /* load config */
5247  load_config (op);
5248 #endif
5249 
5250  /* Advance generation values, so that future mutations do not
5251  interfer with the running operation. */
5252  op->generation_created = set->current_generation;
5253  advance_generation (set);
5254  GNUNET_assert (NULL == op->se);
5255 
5257  "accepting set union operation\n");
5259  "# of accepted union operations",
5260  1,
5261  GNUNET_NO);
5263  "# of total union operations",
5264  1,
5265  GNUNET_NO);
5266  {
5267  struct MultiStrataEstimator *se;
5268  struct GNUNET_MQ_Envelope *ev;
5269  struct StrataEstimatorMessage *strata_msg;
5270  char *buf;
5271  size_t len;
5272  uint16_t type;
5273 
5274  op->se = strata_estimator_dup (op->set->se);
5275  op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
5276  GNUNET_NO);
5277  op->salt_receive = (op->peer_site + 1) % 2;
5278  op->salt_send = op->peer_site; // FIXME?????
5280  op->initial_size = GNUNET_CONTAINER_multihashmap32_size (
5281  op->key_to_element);
5282 
5283  /* kick off the operation */
5284  se = op->se;
5285 
5286  uint8_t se_count = 1;
5287  if (op->initial_size > 0)
5288  {
5289  op->total_elements_size_local = 0;
5290  GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
5291  &
5293  op);
5295  op->total_elements_size_local / op->initial_size,
5296  op->initial_size);
5297  }
5299  * ((SE_IBFS_TOTAL_SIZE / 8) * se_count));
5302  se_count,
5303  buf);
5304 #if MEASURE_PERFORMANCE
5305  perf_store.se.sent += 1;
5306  perf_store.se.sent_var_bytes += len;
5307 #endif
5308 
5309  if (len < se->stratas[0]->strata_count * IBF_BUCKET_SIZE
5312  else
5314  ev = GNUNET_MQ_msg_extra (strata_msg,
5315  len,
5316  type);
5317  GNUNET_memcpy (&strata_msg[1],
5318  buf,
5319  len);
5320  GNUNET_free (buf);
5321  strata_msg->set_size
5323  op->set->content->elements));
5324  strata_msg->se_count = se_count;
5325  GNUNET_MQ_send (op->mq,
5326  ev);
5327  op->phase = PHASE_EXPECT_IBF;
5328  }
5329  /* Now allow CADET to continue, as we did not do this in
5330  #handle_incoming_msg (as we wanted to first see if the
5331  local client would accept the request). */
5332  GNUNET_CADET_receive_done (op->channel);
5334 }
5335 
5336 
5342 static void
5343 shutdown_task (void *cls)
5344 {
5345  /* Delay actual shutdown to allow service to disconnect clients */
5347  if (0 == num_clients)
5348  {
5349  if (NULL != cadet)
5350  {
5352  cadet = NULL;
5353  }
5354  }
5356  GNUNET_YES);
5358  "handled shutdown request\n");
5359 #if MEASURE_PERFORMANCE
5360  calculate_perf_store ();
5361 #endif
5362 }
5363 
5364 
5373 static void
5374 run (void *cls,
5375  const struct GNUNET_CONFIGURATION_Handle *cfg,
5377 {
5378  /* FIXME: need to modify SERVICE (!) API to allow
5379  us to run a shutdown task *after* clients were
5380  forcefully disconnected! */
5382  NULL);
5384  cfg);
5386  if (NULL == cadet)
5387  {
5389  _ ("Could not connect to CADET service\n"));
5391  return;
5392  }
5393 }
5394 
5395 
5400  "set",
5402  &run,
5405  NULL,
5406  GNUNET_MQ_hd_fixed_size (client_accept,
5409  NULL),
5410  GNUNET_MQ_hd_var_size (client_set_add,
5413  NULL),
5414  GNUNET_MQ_hd_fixed_size (client_create_set,
5417  NULL),
5418  GNUNET_MQ_hd_var_size (client_evaluate,
5421  NULL),
5422  GNUNET_MQ_hd_fixed_size (client_listen,
5425  NULL),
5426  GNUNET_MQ_hd_fixed_size (client_reject,
5429  NULL),
5430  GNUNET_MQ_hd_fixed_size (client_cancel,
5433  NULL),
5435 
5436 
5437 /* end of gnunet-service-setu.c */
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
char * getenv()
static const struct GNUNET_CONFIGURATION_Handle * cfg
Configuration we are using.
Definition: gnunet-abd.c:36
static int ret
Return value of the commandline.
Definition: gnunet-abd.c:81
static struct GNUNET_IDENTITY_EgoLookup * el
EgoLookup.
Definition: gnunet-abd.c:51
static struct GNUNET_ARM_Operation * op
Current operation.
Definition: gnunet-arm.c:144
static unsigned int phase
Processing stage that we are in.
Definition: gnunet-arm.c:114
static int res
static void done()
static struct GNUNET_CADET_Handle * mh
Cadet handle.
Definition: gnunet-cadet.c:92
struct GNUNET_HashCode key
The key used in the DHT.
static GstElement * source
Appsrc instance into which we write data for the pipeline.
static pa_context * context
Pulseaudio context.
uint16_t status
See PRISM_STATUS_*-constants.
uint16_t len
length of data (which is always a uint32_t, but presumably this can be used to specify that fewer byt...
static char * value
Value of the record to add/remove.
static struct GNUNET_SERVICE_Handle * service
Handle to our service instance.
static struct GNUNET_CRYPTO_PowSalt salt
Salt for PoW calcualations.
UnionOperationPhase
Current phase we are in for a union operation.
unsigned int strata_estimator_difference(const struct StrataEstimator *se1, const struct StrataEstimator *se2)
Estimate set difference with two strata estimators, i.e.
struct StrataEstimator * strata_estimator_create(unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum)
Create a new strata estimator with the given parameters.
void strata_estimator_destroy(struct StrataEstimator *se)
Destroy a strata estimator, free all of its resources.
int strata_estimator_read(const void *buf, size_t buf_len, int is_compressed, struct StrataEstimator *se)
Read strata from the buffer into the given strata estimator.
size_t strata_estimator_write(const struct StrataEstimator *se, void *buf)
Write the given strata estimator to the buffer.
void strata_estimator_insert(struct StrataEstimator *se, struct IBF_Key key)
Add a key to the strata estimator.
struct StrataEstimator * strata_estimator_dup(struct StrataEstimator *se)
Make a copy of a strata estimator.
static void _GSS_operation_destroy(struct Operation *op)
Destroy the given operation.
#define IBF_MIN_SIZE
Minimal size of an ibf Based on the bsc thesis of Elias Summermatter (2021)
static void _GSS_operation_destroy2(struct Operation *op)
This function probably should not exist and be replaced by inlining more specific logic in the variou...
static void handle_client_create_set(void *cls, const struct GNUNET_SETU_CreateMessage *msg)
Called when a client wants to create a new set.
static int _GSS_is_element_of_operation(struct ElementEntry *ee, struct Operation *op)
Is element ee part of the set used by op?
static int check_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
static struct KeyEntry * op_get_element(struct Operation *op, const struct GNUNET_HashCode *element_hash)
Determine whether the given element is already in the operation's element set.
static unsigned int get_next_ibf_size(float ibf_bucket_number_factor, unsigned int decoded_elements, unsigned int last_ibf_size)
#define MAX_IBF_SIZE
The maximum size of an ibf we use is MAX_IBF_SIZE=2^20.
#define MAX_BUCKETS_PER_MESSAGE
Number of buckets that can be transmitted in one message.
static uint32_t suggest_id
Counter for allocating unique IDs for clients, used to identify incoming operation requests from remo...
static void handle_union_p2p_request_full(void *cls, const struct TransmitFullMessage *msg)
static int check_union_p2p_request_full(void *cls, const struct TransmitFullMessage *mh)
Handle a request for full set transmission.
static int create_randomized_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Create randomized element hashmap for full sending.
static int decode_and_send(struct Operation *op)
Decode which elements are missing on each side, and send the appropriate offers and inquiries.
static int send_full_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Send a set element.
MESSAGE_CONTROL_FLOW_STATE
Different states to control the messages flow in differential mode.
@ MSG_CFS_EXPECTED
Track that receiving this message is expected.
@ MSG_CFS_SENT
Track that a message has been sent.
@ MSG_CFS_UNINITIALIZED
Initial message state.
@ MSG_CFS_RECEIVED
Track that message has been received.
static struct Listener * listener_head
Listeners are held in a doubly linked list.
static void send_full_set(struct Operation *op)
Switch to full set transmission for op.
#define PROBABILITY_FOR_NEW_ROUND
Is the estimated probability for a new round this values is based on the bsc thesis of Elias Summerma...
static int check_byzantine_bounds(struct Operation *op)
Check if all given byzantine parameters are in given boundaries.
static void handle_client_accept(void *cls, const struct GNUNET_SETU_AcceptMessage *msg)
Handle a request from the client to accept a set operation that came from a remote peer.
static void handle_union_p2p_over(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a over message from a remote peer.
static void handle_client_cancel(void *cls, const struct GNUNET_SETU_CancelMessage *msg)
Handle a request from the client to cancel a running set operation.
static void handle_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
static void handle_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
static void handle_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
static int check_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Check an IBF message from a remote peer.
static struct IBF_Key get_ibf_key(const struct GNUNET_HashCode *src)
Derive the IBF key from a hash code and a salt.
static struct GNUNET_CADET_Handle * cadet
Handle to the cadet service, used to listen for and connect to remote peers.
static void maybe_finish(struct Operation *op)
Tests if the operation is finished, and if so notify.
static void handle_client_reject(void *cls, const struct GNUNET_SETU_RejectMessage *msg)
Called when the listening client rejects an operation request by another peer.
static int init_key_to_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for initializing the key-to-element mapping of a union operation.
static int send_offers_iterator(void *cls, uint32_t key, void *value)
Iterator to send elements to a remote peer.
static void handle_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
static void * client_connect_cb(void *cls, struct GNUNET_SERVICE_Client *c, struct GNUNET_MQ_Handle *mq)
Callback called when a client connects to the service.
static int check_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
static void * channel_new_cb(void *cls, struct GNUNET_CADET_Channel *channel, const struct GNUNET_PeerIdentity *source)
Method called whenever another peer has added us to a channel the other peer initiated.
static int check_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
static int send_missing_full_elements_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries.
static unsigned int get_size_from_difference(unsigned int diff, int number_buckets_per_element, float ibf_bucket_number_factor)
Compute the necessary order of an ibf from the size of the symmetric set difference.
static void handle_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Handle a request for a set operation from another peer.
static int in_shutdown
Are we in shutdown? if GNUNET_YES and the number of clients drops to zero, disconnect from CADET.
static int destroy_elements_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator over hash map entries to free element entries.
void send_offers_for_key(struct Operation *op, struct IBF_Key ibf_key)
Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
static void shutdown_task(void *cls)
Called to clean up, after a shutdown has been requested.
static void handle_union_p2p_send_full(void *cls, const struct TransmitFullMessage *msg)
Handle send full message received from other peer.
static void handle_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
static struct GNUNET_STATISTICS_Handle * _GSS_statistics
Statistics handle.
static unsigned int num_clients
Number of active clients.
static void channel_window_cb(void *cls, const struct GNUNET_CADET_Channel *channel, int window_size)
Function called whenever an MQ-channel's transmission window size changes.
static void advance_generation(struct Set *set)
Advance the current generation of a set, adding exclusion ranges if necessary.
static int update_message_control_flow(struct GNUNET_CONTAINER_MultiHashMap *hash_map, enum MESSAGE_CONTROL_FLOW_STATE new_mcfs, const struct GNUNET_HashCode *hash_code, enum MESSAGE_TYPE mt)
Function to update, track and validate message received in differential sync.
static void check_max_differential_rounds(struct Operation *op)
Limit active passive switches in differential sync to configured security level.
static void handle_union_p2p_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a done message from a remote peer.
static int check_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Check a request for a set operation from another peer.
static void fail_union_operation(struct Operation *op)
Inform the client that the union operation has failed, and proceed to destroy the evaluate operation.
static int check_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Check offer (of struct GNUNET_HashCodes).
static void send_client_done(void *cls)
Signal to the client that the operation has finished and destroy the operation.
static void unsalt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
Reverse modification done in the salt_key function.
static void handle_union_p2p_full_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a "full done" message.
static void salt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
Modify an IBF key k_in based on the salt, returning a salted key in k_out.
static int prepare_ibf(struct Operation *op, uint32_t size)
Create an ibf with the operation's elements of the specified size.
static void op_register_element(struct Operation *op, struct ElementEntry *ee, int received)
Insert an element into the union operation's key-to-element mapping.
static void incoming_destroy(struct Operation *op)
Destroy an incoming request from a remote peer.
static struct Listener * listener_tail
Listeners are held in a doubly linked list.
static int determinate_avg_element_size_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for determining average size.
static int check_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check a full element message from a remote peer.
MESSAGE_TYPE
Message types to track in message control flow.
@ DEMAND_MESSAGE
Demand message type.
@ OFFER_MESSAGE
Offer message type.
@ ELEMENT_MESSAGE
Element message type.
static enum GNUNET_GenericReturnValue check_valid_phase(const uint8_t allowed_phases[], size_t size_phases, struct Operation *op)
Validates the if a message is received in a correct phase.
GNUNET_SERVICE_MAIN("set", GNUNET_SERVICE_OPTION_NONE, &run, &client_connect_cb, &client_disconnect_cb, NULL, GNUNET_MQ_hd_fixed_size(client_accept, GNUNET_MESSAGE_TYPE_SETU_ACCEPT, struct GNUNET_SETU_AcceptMessage, NULL), GNUNET_MQ_hd_var_size(client_set_add, GNUNET_MESSAGE_TYPE_SETU_ADD, struct GNUNET_SETU_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size(client_create_set, GNUNET_MESSAGE_TYPE_SETU_CREATE, struct GNUNET_SETU_CreateMessage, NULL), GNUNET_MQ_hd_var_size(client_evaluate, GNUNET_MESSAGE_TYPE_SETU_EVALUATE, struct GNUNET_SETU_EvaluateMessage, NULL), GNUNET_MQ_hd_fixed_size(client_listen, GNUNET_MESSAGE_TYPE_SETU_LISTEN, struct GNUNET_SETU_ListenMessage, NULL), GNUNET_MQ_hd_fixed_size(client_reject, GNUNET_MESSAGE_TYPE_SETU_REJECT, struct GNUNET_SETU_RejectMessage, NULL), GNUNET_MQ_hd_fixed_size(client_cancel, GNUNET_MESSAGE_TYPE_SETU_CANCEL, struct GNUNET_SETU_CancelMessage, NULL), GNUNET_MQ_handler_end())
Define "main" method using service macro.
static void handle_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Handle offers (of struct GNUNET_HashCodes) and respond with demands (of struct GNUNET_HashCodes).
static void initialize_key_to_element(struct Operation *op)
Initialize the IBF key to element mapping local to this set operation.
static int check_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Check a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
static struct Operation * get_incoming(uint32_t id)
Get the incoming socket associated with the given id.
static int destroy_key_to_element_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries.
static void handle_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Handle an IBF message from a remote peer.
static int check_union_p2p_send_full(void *cls, const struct TransmitFullMessage *msg)
Check send full message received from other peer.
static void handle_client_listen(void *cls, const struct GNUNET_SETU_ListenMessage *msg)
Called when a client wants to create a new listener.
#define SE_STRATA_COUNT
Number of IBFs in a strata estimator.
static void full_sync_plausibility_check(struct Operation *op)
Function that checks if full sync is plausible.
static void handle_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
#define INCOMING_CHANNEL_TIMEOUT
How long do we hold on to an incoming channel if there is no local listener before giving up?
static void handle_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
#define LOG(kind,...)
static int is_message_in_message_control_flow(struct GNUNET_CONTAINER_MultiHashMap *hash_map, struct GNUNET_HashCode *hash_code, enum MESSAGE_TYPE mt)
Validate if a message in differential sync si already received before.
#define SECURITY_LEVEL
Security level used for byzantine checks (2^80)
static int send_ibf(struct Operation *op, uint32_t ibf_size)
Send an ibf of appropriate size.
MODE_OF_OPERATION
Different modes of operations.
@ DIFFERENTIAL_SYNC
Mode just synchronizes the difference between sets.
@ FULL_SYNC_LOCAL_SENDING_FIRST
Mode send full set sending local set first.
@ FULL_SYNC_REMOTE_SENDING_FIRST
Mode request full set from remote peer.
static void client_disconnect_cb(void *cls, struct GNUNET_SERVICE_Client *client, void *internal_cls)
Clean up after a client has disconnected.
@ PHASE_FINISH_CLOSING
The protocol is almost finished, but we still have to flush our message queue and/or expect some elem...
@ PHASE_EXPECT_SE
We sent the request message, and expect a strata estimator.
@ PHASE_EXPECT_IBF_LAST
Continuation for multi part IBFs.
@ PHASE_FULL_RECEIVING
Phase that receives full set first and then sends elements that are the local peer missing.
@ PHASE_FINISH_WAITING
In the penultimate phase, we wait until all our demands are satisfied.
@ PHASE_FINISHED
In the ultimate phase, we wait until our demands are satisfied and then quit (sending another DONE me...
@ PHASE_PASSIVE_DECODING
The other peer is decoding the IBF we just sent.
@ PHASE_ACTIVE_DECODING
We are decoding an IBF.
@ PHASE_FULL_SENDING
After sending the full set, wait for responses with the elements that the local peer is missing.
@ PHASE_EXPECT_IBF
We sent the strata estimator, and expect an IBF.
static uint8_t estimate_best_mode_of_operation(uint64_t avg_element_size, uint64_t local_set_size, uint64_t remote_set_size, uint64_t est_set_diff_remote, uint64_t est_set_diff_local, uint64_t bandwith_latency_tradeoff, uint64_t ibf_bucket_number_factor)
Function that chooses the optimal mode of operation depending on operation parameters.
static void channel_end_cb(void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
Function called whenever a channel is destroyed.
static void send_client_element(struct Operation *op, const struct GNUNET_SETU_Element *element, enum GNUNET_SETU_Status status)
Send a result message to the client indicating that there is a new element.
static int determinate_done_message_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for determining if all demands have been satisfied.
#define SE_IBF_HASH_NUM
The hash num parameter for the difference digests and strata estimators.
static void incoming_timeout_cb(void *cls)
Timeout happens iff:
#define SE_IBFS_TOTAL_SIZE
Primes for all 4 different strata estimators 61,67,71,73,79,83,89,97 348 Based on the bsc thesis of E...
#define DIFFERENTIAL_RTT_MEAN
AVG RTT for differential sync when k=2 and Factor = 2 Based on the bsc thesis of Elias Summermatter (...
static int op_get_element_iterator(void *cls, uint32_t key, void *value)
Iterator over the mapping from IBF keys to element entries.
static int prepare_ibf_iterator(void *cls, uint32_t key, void *value)
Insert a key into an ibf.
static int check_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check an element message from a remote peer.
static void run(void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_SERVICE_Handle *service)
Function called by the service's run method to run service-specific setup code.
static int check_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
uint8_t determine_strata_count(uint64_t avg_element_size, uint64_t element_count)
Calculates the optimal number of strata Estimators to send.
static char buf[2048]
static unsigned int ibf_size
static unsigned int element_size
static struct GNUNET_DNSSTUB_Context * ctx
Context for DNS resolution.
#define GNUNET_log(kind,...)
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:53
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:36
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
GNUNET_GenericReturnValue
Named constants for return values.
Definition: gnunet_common.h:92
@ GNUNET_OK
Definition: gnunet_common.h:95
@ GNUNET_YES
Definition: gnunet_common.h:97
@ GNUNET_NO
Definition: gnunet_common.h:94
@ GNUNET_SYSERR
Definition: gnunet_common.h:93
struct GNUNET_HashContext * GNUNET_CRYPTO_hash_context_start(void)
Start incremental hashing operation.
Definition: crypto_hash.c:321
void GNUNET_CRYPTO_hash_context_read(struct GNUNET_HashContext *hc, const void *buf, size_t size)
Add data to be hashed.
Definition: crypto_hash.c:340
void GNUNET_CRYPTO_hash_context_finish(struct GNUNET_HashContext *hc, struct GNUNET_HashCode *r_hash)
Finish the hash computation.
Definition: crypto_hash.c:364
struct GNUNET_CADET_Channel * GNUNET_CADET_channel_create(struct GNUNET_CADET_Handle *h, void *channel_cls, const struct GNUNET_PeerIdentity *destination, const struct GNUNET_HashCode *port, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Create a new channel towards a remote peer.
Definition: cadet_api.c:1031
void GNUNET_CADET_receive_done(struct GNUNET_CADET_Channel *channel)
Send an ack on the channel to confirm the processing of a message.
Definition: cadet_api.c:888
void GNUNET_CADET_channel_destroy(struct GNUNET_CADET_Channel *channel)
Destroy an existing channel.
Definition: cadet_api.c:837
void GNUNET_CADET_disconnect(struct GNUNET_CADET_Handle *handle)
Disconnect from the cadet service.
Definition: cadet_api.c:775
void GNUNET_CADET_close_port(struct GNUNET_CADET_Port *p)
Close a port opened with GNUNET_CADET_open_port().
Definition: cadet_api.c:808
struct GNUNET_MQ_Handle * GNUNET_CADET_get_mq(const struct GNUNET_CADET_Channel *channel)
Obtain the message queue for a connected peer.
Definition: cadet_api.c:1082
struct GNUNET_CADET_Handle * GNUNET_CADET_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the MQ-based cadet service.
Definition: cadet_api.c:910
struct GNUNET_CADET_Port * GNUNET_CADET_open_port(struct GNUNET_CADET_Handle *h, const struct GNUNET_HashCode *port, GNUNET_CADET_ConnectEventHandler connects, void *connects_cls, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Open a port to receive incoming MQ-based channels.
Definition: cadet_api.c:970
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_get_value_number(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, unsigned long long *number)
Get a configuration value that should be a number.
struct GNUNET_CONFIGURATION_Handle * GNUNET_CONFIGURATION_create(void)
Create a new configuration object.
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_load(struct GNUNET_CONFIGURATION_Handle *cfg, const char *filename)
Load configuration.
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_get_value_float(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, float *number)
Get a configuration value that should be a floating point number.
uint64_t GNUNET_CRYPTO_random_u64(enum GNUNET_CRYPTO_Quality mode, uint64_t max)
Random on unsigned 64-bit values.
uint32_t GNUNET_CRYPTO_random_u32(enum GNUNET_CRYPTO_Quality mode, uint32_t i)
Produce a random value.
@ GNUNET_CRYPTO_QUALITY_NONCE
Randomness for IVs etc.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
int GNUNET_CRYPTO_hash_cmp(const struct GNUNET_HashCode *h1, const struct GNUNET_HashCode *h2)
Compare function for HashCodes, producing a total ordering of all hashcodes.
Definition: crypto_hash.c:201
enum GNUNET_GenericReturnValue GNUNET_CRYPTO_kdf(void *result, size_t out_len, const void *xts, size_t xts_len, const void *skm, size_t skm_len,...)
Derive key.
Definition: crypto_kdf.c:90
int GNUNET_CONTAINER_multihashmap32_get_multiple(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, GNUNET_CONTAINER_MulitHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map that match a particular key.
int GNUNET_CONTAINER_multihashmap_contains(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Check if the map contains any value under the given key (including values that are NULL).
int GNUNET_CONTAINER_multihashmap_remove(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, const void *value)
Remove the given key-value pair from the map.
int GNUNET_CONTAINER_multihashmap32_put(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
unsigned int GNUNET_CONTAINER_multihashmap32_size(const struct GNUNET_CONTAINER_MultiHashMap32 *map)
Get the number of key-value pairs in the map.
int GNUNET_CONTAINER_multihashmap32_iterate(struct GNUNET_CONTAINER_MultiHashMap32 *map, GNUNET_CONTAINER_MulitHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
int GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
unsigned int GNUNET_CONTAINER_multihashmap_size(const struct GNUNET_CONTAINER_MultiHashMap *map)
Get the number of key-value pairs in the map.
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MulitHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
struct GNUNET_CONTAINER_MultiHashMap32 * GNUNET_CONTAINER_multihashmap32_create(unsigned int len)
Create a 32-bit key multi hash map.
void GNUNET_CONTAINER_multihashmap32_destroy(struct GNUNET_CONTAINER_MultiHashMap32 *map)
Destroy a 32-bit key hash map.
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST
, ' bother checking if a value already exists (faster than GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE...
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE
Allow multiple values with the same key.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY
There must only be one value per key; storing a value should fail if a value under the same key alrea...
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
If a value with the given key exists, replace it.
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur.
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_DEBUG
@ GNUNET_ERROR_TYPE_INFO
int GNUNET_snprintf(char *buf, size_t size, const char *format,...) __attribute__((format(printf
Like snprintf, just aborts if the buffer is of insufficient size.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_malloc(size)
Wrapper around malloc.
struct GNUNET_MessageHeader * GNUNET_copy_message(const struct GNUNET_MessageHeader *msg)
Create a copy of the given message.
#define GNUNET_free(ptr)
Wrapper around free.
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:355
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *mqm)
Discard the message queue message, free all allocated resources.
Definition: mq.c:323
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct.
Definition: gnunet_mq_lib.h:52
#define GNUNET_MQ_msg_nested_mh(mvar, type, mh)
Allocate a GNUNET_MQ_Envelope, and append a payload message after the given message struct.
#define GNUNET_MQ_msg_header(type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
Definition: gnunet_mq_lib.h:76
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:67
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
#define GNUNET_MQ_msg_header_extra(mh, esize, type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header and extra space.
Definition: gnunet_mq_lib.h:88
#define GNUNET_MQ_extract_nested_mh(var)
Return a pointer to the message at the end of the given message.
#define GNUNET_MESSAGE_TYPE_SETU_REQUEST
Notify the client of an incoming request from a remote peer.
#define GNUNET_MESSAGE_TYPE_SETU_REJECT
Reject a set request.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST
Request a set union operation from a remote peer.
#define GNUNET_MESSAGE_TYPE_SETU_RESULT
Handle result message from operation.
#define GNUNET_MESSAGE_TYPE_SETU_CREATE
Create a new local set.
#define GNUNET_MESSAGE_TYPE_SETU_ADD
Add element to set.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_DONE
Set operation is done.
#define GNUNET_MESSAGE_TYPE_SETU_ACCEPT
Accept an incoming set request.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER
Request all missing elements from the other peer, based on their sets and the elements we previously ...
#define GNUNET_MESSAGE_TYPE_SETU_CANCEL
Cancel a set operation.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER
Tell the other peer which hashes match a given IBF key.
#define GNUNET_MESSAGE_TYPE_SETU_LISTEN
Listen for operation requests.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE
Request all missing elements from the other peer, based on their sets and the elements we previously ...
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEC
Compressed strata estimator.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_IBF
Invertible bloom filter.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS
Actual set elements.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL
Signals other peer that all elements are sent.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT
Send a set element, not as response to a demand but because we're sending the full set.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SE
Strata estimator.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY
Tell the other peer to send us a list of hashes that match an IBF key.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SETU_EVALUATE
Evaluate a set operation.
void GNUNET_SCHEDULER_shutdown(void)
Request the shutdown of a scheduler.
Definition: scheduler.c:531
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run on shutdown, that is when a CTRL-C signal is received,...
Definition: scheduler.c:1331
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:972
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1269
void GNUNET_SERVICE_client_drop(struct GNUNET_SERVICE_Client *c)
Ask the server to disconnect from the given client.
Definition: service.c:2325
void GNUNET_SERVICE_client_continue(struct GNUNET_SERVICE_Client *c)
Continue receiving further messages from the given client.
Definition: service.c:2244
@ GNUNET_SERVICE_OPTION_NONE
Use defaults.
#define GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE
Maximum size of a context message for set operation requests.
void GNUNET_SETU_element_hash(const struct GNUNET_SETU_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition: setu_api.c:916
GNUNET_SETU_Status
Status for the result callback.
@ GNUNET_SETU_STATUS_DONE
Success, all elements have been sent (and received).
@ GNUNET_SETU_STATUS_ADD_REMOTE
Element should be added to the result set of the remote peer, i.e.
@ GNUNET_SETU_STATUS_FAILURE
The other peer refused to do the operation with us, or something went wrong.
@ GNUNET_SETU_STATUS_ADD_LOCAL
Element should be added to the result set of the local peer, i.e.
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
static unsigned int size
Size of the "table".
Definition: peer.c:67
#define _(String)
GNU gettext support macro.
Definition: platform.h:177
const char * name
uint32_t number
void ibf_subtract(struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2)
Subtract ibf2 from ibf1, storing the result in ibf1.
Definition: ibf.c:356
int ibf_decode(struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *ret_id)
Decode and remove an element from the IBF, if possible.
Definition: ibf.c:228
void ibf_read_slice(const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf)
Read buckets from a buffer into an ibf.
Definition: ibf.c:323
void ibf_write_slice(const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf)
Write buckets from an ibf to a buffer.
Definition: ibf.c:290
struct InvertibleBloomFilter * ibf_create(uint32_t size, uint8_t hash_num)
Create an invertible bloom filter.
Definition: ibf.c:79
void ibf_insert(struct InvertibleBloomFilter *ibf, struct IBF_Key key)
Insert a key into an IBF.
Definition: ibf.c:167
struct InvertibleBloomFilter * ibf_dup(const struct InvertibleBloomFilter *ibf)
Create a copy of an IBF, the copy has to be destroyed properly.
Definition: ibf.c:379
void ibf_destroy(struct InvertibleBloomFilter *ibf)
Destroy all resources associated with the invertible bloom filter.
Definition: ibf.c:403
#define IBF_BUCKET_SIZE
Size of one ibf bucket in bytes.
Definition: ibf.h:72
uint8_t ibf_get_max_counter(struct InvertibleBloomFilter *ibf)
Returns the minimal bytes needed to store the counter of the IBF.
Definition: ibf.c:298
State we keep per client.
struct GNUNET_MQ_Handle * mq
MQ to talk to client.
struct Listener * listener
Listener, if associated with the client, otherwise NULL.
struct Set * set
Set, if associated with the client, otherwise NULL.
struct GNUNET_SERVICE_Client * client
Client this is about.
Information about an element element in the set.
int remote
GNUNET_YES if the element is a remote element, and does not belong to the operation's set.
struct GNUNET_SET_Element element
The actual element.
unsigned int generation
First generation that includes this element.
struct GNUNET_HashCode element_hash
Hash of the element.
struct GNUNET_ARM_Operation * next
This is a doubly-linked list.
Definition: arm_api.c:45
Opaque handle to a channel.
Definition: cadet.h:117
Opaque handle to the service.
Definition: cadet_api.c:39
Opaque handle to a port.
Definition: cadet_api.c:80
Internal representation of the hash map.
Internal representation of the hash map.
A 512-bit hashcode.
Handle to a message queue.
Definition: mq.c:86
Message handler for a specific message type.
Header for all communications.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
The identity of the host (wraps the signing key of the peer).
Entry in list of pending tasks.
Definition: scheduler.c:135
Handle to a client that is connected to a service.
Definition: service.c:251
Handle to a service.
Definition: service.c:117
Message sent by a listening client to the service to accept performing the operation with the other p...
Definition: setu.h:79
Sent to the service by the client in order to cancel a set operation.
Definition: setu.h:350
Message sent by the client to the service to ask starting a new set to perform operations with.
Definition: setu.h:41
Message sent by client to the service to add an element to the set.
Definition: setu.h:326
uint16_t element_type
Type of the element to add or remove.
Definition: setu.h:335
uint16_t reserved
For alignment, always zero.
Definition: setu.h:340
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SETU_ADD.
Definition: setu.h:330
Element stored in a set.
uint16_t element_type
Application-specific element type.
uint16_t size
Number of bytes in the buffer pointed to by data.
const void * data
Actual data of the element.
Message sent by client to service to initiate a set operation as a client (not as listener).
Definition: setu.h:203
Message sent by the client to the service to start listening for incoming requests to perform a certa...
Definition: setu.h:56
Message sent by a listening client to the service to reject performing the operation with the other p...
Definition: setu.h:158
A request for an operation with another client.
Definition: setu.h:175
struct GNUNET_PeerIdentity peer_id
Identity of the requesting peer.
Definition: setu.h:190
uint32_t accept_id
ID of the to identify the request when accepting or rejecting it.
Definition: setu.h:185
Message sent by the service to the client to indicate an element that is removed (set intersection) o...
Definition: setu.h:290
uint64_t current_size
Current set size.
Definition: setu.h:299
uint16_t result_status
Was the evaluation successful? Contains an enum GNUNET_SETU_Status in NBO.
Definition: setu.h:310
uint16_t element_type
Type of the element attached to the message, if any.
Definition: setu.h:315
uint32_t request_id
id the result belongs to
Definition: setu.h:304
uint16_t size
Number of bytes in the buffer pointed to by data.
const void * data
Actual data of the element.
uint16_t element_type
Application-specific element type.
Handle for the service.
Context for op_get_element_iterator.
struct GNUNET_HashCode hash
FIXME.
struct KeyEntry * k
FIXME.
Message containing buckets of an invertible bloom filter.
Hash of an IBF key.
Definition: ibf.h:55
Keys that can be inserted into and removed from an IBF.
Definition: ibf.h:46
uint64_t key_val
Definition: ibf.h:47
estimate_best_mode_of_operation (uint64_t avg_element_size, uint64_t local_set_size,...
Invertible bloom filter (IBF).
Definition: ibf.h:83
int remote_decoded_count
If an IBF is decoded this count stores how many elements are on the remote site.
Definition: ibf.h:108
int local_decoded_count
If an IBF is decoded this count stores how many elements are on the local site.
Definition: ibf.h:101
uint32_t size
How many cells does this IBF have?
Definition: ibf.h:87
The key entry is used to associate an ibf key with an element.
struct ElementEntry * element
The actual element associated with the key.
struct IBF_Key ibf_key
IBF key for the entry, derived from the current salt.
int received
Did we receive this element? Even if element->is_foreign is false, we might have received the element...
A listener is inhabited by a client, and waits for evaluation requests from remote peers.
struct Listener * next
Listeners are held in a doubly linked list.
struct ClientState * cs
Client that owns the listener.
struct GNUNET_HashCode app_id
Application ID for the operation, used to distinguish multiple operations of the same type with the s...
struct GNUNET_CADET_Port * open_port
The port we are listening on with CADET.
struct Listener * prev
Listeners are held in a doubly linked list.
struct Operation * op_tail
Tail of DLL of operations this listener is responsible for.
struct Operation * op_head
Head of DLL of operations this listener is responsible for.
struct StrataEstimator ** stratas
Array of strata estimators.
Operation context used to execute a set operation.
Definition:</