GNUnet  0.11.x
gnunet-service-setu.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2013-2017, 2020 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
30 #include "ibf.h"
31 #include "gnunet_protocols.h"
32 #include "gnunet_applications.h"
33 #include "gnunet_cadet_service.h"
37 #include <gcrypt.h>
38 #include "gnunet_setu_service.h"
39 #include "setu.h"
40 
41 #define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
42 
47 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
48 
52 #define SE_STRATA_COUNT 32
53 
54 
59 #define SE_IBFS_TOTAL_SIZE 632
60 
64 #define SE_IBF_HASH_NUM 3
65 
69 #define MAX_BUCKETS_PER_MESSAGE ((1 << 16) / IBF_BUCKET_SIZE)
70 
76 #define MAX_IBF_SIZE 1048576
77 
78 
83 #define IBF_MIN_SIZE 37
84 
89 #define DIFFERENTIAL_RTT_MEAN 3.65145
90 
95 #define SECURITY_LEVEL 80
96 
102 #define PROBABILITY_FOR_NEW_ROUND 0.15
103 
108 #define MEASURE_PERFORMANCE 0
109 
110 
115 {
120 
131 
136 
141 
146 
152 
158 
164 
170 
176 };
177 
183 {
188 
193 
198 };
199 
200 
207 struct ElementEntry
208 {
214 
220 
224  unsigned int generation;
225 
230  int remote;
231 };
232 
233 
238 struct Listener;
239 
240 
244 struct Set;
245 
246 
250 struct ClientState
251 {
255  struct Set *set;
256 
260  struct Listener *listener;
261 
265  struct GNUNET_SERVICE_Client *client;
266 
270  struct GNUNET_MQ_Handle *mq;
271 };
272 
273 
277 struct Operation
278 {
279 
284  struct GNUNET_PeerIdentity peer;
285 
289  uint64_t initial_size;
290 
294  struct Operation *next;
295 
299  struct Operation *prev;
300 
304  struct GNUNET_CADET_Channel *channel;
305 
309  struct Listener *listener;
310 
314  struct GNUNET_MQ_Handle *mq;
315 
319  struct GNUNET_MessageHeader *context_msg;
320 
325  struct Set *set;
326 
332 
337 
342 
349 
355 
360 
365 
369  int client_done_sent;
370 
375 
379  uint32_t salt_send;
380 
384  uint32_t salt_receive;
385 
390  uint32_t received_fresh;
391 
395  uint32_t received_total;
396 
400  uint32_t salt;
401 
405  uint32_t remote_element_count;
406 
410  uint32_t client_request_id;
411 
416  int force_delta;
417 
422  int force_full;
423 
428  int byzantine;
429 
435 
441 
447  uint32_t suggest_id;
448 
453  unsigned int generation_created;
454 
455 
460 
461 
466 
467 
473 
479  uint8_t peer_site;
480 
481 
486 
491 
496 
501 
502 
507 
512 
517 
521  uint64_t remote_set_diff;
522 
526  uint64_t local_set_diff;
527 
532 };
533 
534 
539 struct SetContent
540 {
544  struct GNUNET_CONTAINER_MultiHashMap *elements;
545 
550 
555 
559  unsigned int refcount;
560 
564  unsigned int latest_generation;
565 
569  int iterator_count;
570 };
571 
572 
576 struct Set
577 {
581  struct Set *next;
582 
586  struct Set *prev;
587 
592  struct ClientState *cs;
593 
598  struct SetContent *content;
599 
605 
609  struct Operation *ops_head;
610 
614  struct Operation *ops_tail;
615 
620  unsigned int current_generation;
621 
622 };
623 
624 
628 struct KeyEntry
629 {
633  struct IBF_Key ibf_key;
634 
641  struct ElementEntry *element;
642 
648  int received;
649 };
650 
651 
656 struct SendElementClosure
657 {
662  struct IBF_Key ibf_key;
663 
668  struct Operation *op;
669 };
670 
671 
676 struct Listener
677 {
681  struct Listener *next;
682 
686  struct Listener *prev;
687 
693  struct Operation *op_head;
694 
700  struct Operation *op_tail;
701 
706  struct ClientState *cs;
707 
711  struct GNUNET_CADET_Port *open_port;
712 
717  struct GNUNET_HashCode app_id;
718 
719 };
720 
721 
726 static struct GNUNET_CADET_Handle *cadet;
727 
732 
736 static struct Listener *listener_head;
737 
741 static struct Listener *listener_tail;
742 
746 static unsigned int num_clients;
747 
752 static int in_shutdown;
753 
759 static uint32_t suggest_id;
760 
761 #if MEASURE_PERFORMANCE
762 
766 static const struct GNUNET_CONFIGURATION_Handle *setu_cfg;
767 
768 
774 struct perf_num_send_received_msg
775 {
776  uint64_t sent;
777  uint64_t sent_var_bytes;
778  uint64_t received;
779  uint64_t received_var_bytes;
780 };
781 
785 struct per_store_struct
786 {
787  struct perf_num_send_received_msg operation_request;
788  struct perf_num_send_received_msg se;
789  struct perf_num_send_received_msg request_full;
790  struct perf_num_send_received_msg element_full;
791  struct perf_num_send_received_msg full_done;
792  struct perf_num_send_received_msg ibf;
793  struct perf_num_send_received_msg inquery;
794  struct perf_num_send_received_msg element;
795  struct perf_num_send_received_msg demand;
796  struct perf_num_send_received_msg offer;
797  struct perf_num_send_received_msg done;
798  struct perf_num_send_received_msg over;
799  uint64_t se_diff;
800  uint64_t se_diff_remote;
801  uint64_t se_diff_local;
802  uint64_t active_passive_switches;
803  uint8_t mode_of_operation;
804 };
805 
806 struct per_store_struct perf_store;
807 #endif
808 
814 {
819 
824 
829 
834 };
835 
841 {
846 
851 
856 };
857 
858 
863 {
876 };
877 
878 
879 #if MEASURE_PERFORMANCE
880 
886 static void
887 load_config (struct Operation *op)
888 {
889  long long number;
890  float fl;
891 
892  setu_cfg = GNUNET_CONFIGURATION_create ();
893  GNUNET_CONFIGURATION_load (setu_cfg,
894  "perf_setu.conf");
896  "IBF",
897  "BUCKET_NUMBER_FACTOR",
898  &fl);
899  op->ibf_bucket_number_factor = fl;
901  "IBF",
902  "NUMBER_PER_BUCKET",
903  &number);
906  "PERFORMANCE",
907  "TRADEOFF",
908  &number);
911  "BOUNDARIES",
912  "UPPER_ELEMENT",
913  &number);
915  op->peer_site = 0;
916 }
917 
918 
925 static int
926 sum_sent_received_bytes (uint64_t size,
927  struct perf_num_send_received_msg
928  perf_num_send_received_msg)
929 {
930  return (size * perf_num_send_received_msg.sent)
931  + (size * perf_num_send_received_msg.received)
932  + perf_num_send_received_msg.sent_var_bytes
933  + perf_num_send_received_msg.received_var_bytes;
934 }
935 
936 
940 static void
941 calculate_perf_store ()
942 {
943 
947  float rtt = 1;
948  int bytes_transmitted = 0;
949 
953  if ((perf_store.element_full.received != 0) ||
954  (perf_store.element_full.sent != 0)
955  )
956  rtt += 1;
957 
958  if ((perf_store.request_full.received != 0) ||
959  (perf_store.request_full.sent != 0)
960  )
961  rtt += 0.5;
962 
967  if ((perf_store.element.received != 0) ||
968  (perf_store.element.sent != 0))
969  {
970  int iterations = perf_store.active_passive_switches;
971 
972  if (iterations > 0)
973  rtt += iterations * 0.5;
974  rtt += 2.5;
975  }
976 
977 
981  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
983  perf_store.request_full);
984 
985  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
987  perf_store.element_full);
988  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
990  perf_store.element);
991  // bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_store.operation_request);
992  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
994  perf_store.se);
995  bytes_transmitted += sum_sent_received_bytes (4, perf_store.full_done);
996  bytes_transmitted += sum_sent_received_bytes (sizeof(struct IBFMessage),
997  perf_store.ibf);
998  bytes_transmitted += sum_sent_received_bytes (sizeof(struct InquiryMessage),
999  perf_store.inquery);
1000  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1002  perf_store.demand);
1003  bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1005  perf_store.offer);
1006  bytes_transmitted += sum_sent_received_bytes (4, perf_store.done);
1007 
1011  float factor;
1012  GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR",
1013  &factor);
1014  long long num_per_bucket;
1015  GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET",
1016  &num_per_bucket);
1017 
1018 
1019  int decoded = 0;
1020  if (perf_store.active_passive_switches == 0)
1021  decoded = 1;
1022  int ibf_bytes_transmitted = sum_sent_received_bytes (sizeof(struct
1023  IBFMessage),
1024  perf_store.ibf);
1025 
1026  FILE *out1 = fopen ("perf_data.csv", "a");
1027  fprintf (out1, "%d,%f,%d,%d,%f,%d,%d,%d,%d,%d\n",num_per_bucket,factor,
1028  decoded,ibf_bytes_transmitted,rtt,perf_store.se_diff,
1029  bytes_transmitted,
1030  perf_store.se_diff_local,perf_store.se_diff_remote,
1031  perf_store.mode_of_operation);
1032  fclose (out1);
1033 
1034 }
1035 
1036 
1037 #endif
1038 
1050 static uint8_t
1051 estimate_best_mode_of_operation (uint64_t avg_element_size,
1052  uint64_t local_set_size,
1053  uint64_t remote_set_size,
1054  uint64_t est_set_diff_remote,
1055  uint64_t est_set_diff_local,
1056  uint64_t bandwith_latency_tradeoff,
1057  uint64_t ibf_bucket_number_factor)
1058 {
1059 
1060  /*
1061  * In case of initial sync fall to predefined states
1062  */
1063 
1064  if (0 == local_set_size)
1066  if (0 == remote_set_size)
1068 
1069  /*
1070  * Calculate bytes for full Sync
1071  */
1072 
1073  uint8_t sizeof_full_done_header = 4;
1074  uint8_t sizeof_done_header = 4;
1075  uint8_t rtt_min_full = 2;
1076  uint8_t sizeof_request_full = 4;
1077  uint64_t estimated_total_diff = (est_set_diff_remote + est_set_diff_local);
1078 
1079  /* Estimate byte required if we send first */
1080  uint64_t total_elements_to_send_local_send_first = est_set_diff_remote
1081  + local_set_size;
1082 
1083  uint64_t total_bytes_full_local_send_first = (avg_element_size
1084  *
1085  total_elements_to_send_local_send_first) \
1086  + (
1087  total_elements_to_send_local_send_first * sizeof(struct
1089  + (sizeof_full_done_header * 2) \
1090  + rtt_min_full
1091  * bandwith_latency_tradeoff;
1092 
1093  /* Estimate bytes required if we request from remote peer */
1094  uint64_t total_elements_to_send_remote_send_first = est_set_diff_local
1095  + remote_set_size;
1096 
1097  uint64_t total_bytes_full_remote_send_first = (avg_element_size
1098  *
1099  total_elements_to_send_remote_send_first) \
1100  + (
1101  total_elements_to_send_remote_send_first * sizeof(struct
1103  + (sizeof_full_done_header * 2) \
1104  + (rtt_min_full + 0.5)
1105  * bandwith_latency_tradeoff \
1106  + sizeof_request_full;
1107 
1108  /*
1109  * Calculate bytes for differential Sync
1110  */
1111 
1112  /* Estimate bytes required by IBF transmission*/
1113 
1114  long double ibf_bucket_count = estimated_total_diff
1115  * ibf_bucket_number_factor;
1116 
1117  if (ibf_bucket_count <= IBF_MIN_SIZE)
1118  {
1119  ibf_bucket_count = IBF_MIN_SIZE;
1120  }
1121  uint64_t ibf_message_count = ceil ( ((float) ibf_bucket_count)
1123 
1124  uint64_t estimated_counter_size = ceil (
1125  MIN (2 * log2l ((float) local_set_size / ibf_bucket_count), log2l (
1126  local_set_size)));
1127 
1128  long double counter_bytes = (float) estimated_counter_size / 8;
1129 
1130  uint64_t ibf_bytes = ceil ((sizeof(struct IBFMessage) * ibf_message_count)
1131  * 1.2 \
1132  + (ibf_bucket_count * sizeof(struct IBF_Key)) * 1.2 \
1133  + (ibf_bucket_count * sizeof(struct IBF_KeyHash))
1134  * 1.2 \
1135  + (ibf_bucket_count * counter_bytes) * 1.2);
1136 
1137  /* Estimate full byte count for differential sync */
1138  uint64_t element_size = (avg_element_size + sizeof(struct
1140  * estimated_total_diff;
1141  uint64_t done_size = sizeof_done_header;
1142  uint64_t inquery_size = (sizeof(struct IBF_Key) + sizeof(struct
1143  InquiryMessage))
1144  * estimated_total_diff;
1145  uint64_t demand_size =
1146  (sizeof(struct GNUNET_HashCode) + sizeof(struct GNUNET_MessageHeader))
1147  * estimated_total_diff;
1148  uint64_t offer_size = (sizeof(struct GNUNET_HashCode) + sizeof(struct
1149  GNUNET_MessageHeader))
1150  * estimated_total_diff;
1151 
1152  uint64_t total_bytes_diff = (element_size + done_size + inquery_size
1153  + demand_size + offer_size + ibf_bytes) \
1155  * bandwith_latency_tradeoff);
1156 
1157  uint64_t full_min = MIN (total_bytes_full_local_send_first,
1158  total_bytes_full_remote_send_first);
1159 
1160  /* Decide between full and differential sync */
1161 
1162  if (full_min < total_bytes_diff)
1163  {
1164  /* Decide between sending all element first or receiving all elements */
1165  if (total_bytes_full_remote_send_first > total_bytes_full_local_send_first)
1166  {
1168  }
1169  else
1170  {
1172  }
1173  }
1174  else
1175  {
1176  return DIFFERENTIAL_SYNC;
1177  }
1178 }
1179 
1180 
1189 static int
1190 check_valid_phase (const uint8_t allowed_phases[], size_t size_phases, struct
1191  Operation *op)
1192 {
1196  for (uint32_t phase_ctr = 0; phase_ctr < size_phases; phase_ctr++)
1197  {
1198  uint8_t phase = allowed_phases[phase_ctr];
1199  if (phase == op->phase)
1200  {
1202  "Message received in valid phase\n");
1203  return GNUNET_YES;
1204  }
1205  }
1207  "Received message in invalid phase: %u\n", op->phase);
1208  return GNUNET_NO;
1209 }
1210 
1211 
1223 static int
1225  enum MESSAGE_CONTROL_FLOW_STATE new_mcfs,
1226  const struct GNUNET_HashCode *hash_code,
1227  enum MESSAGE_TYPE mt)
1228 {
1229  struct messageControlFlowElement *cfe = NULL;
1230  enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1231 
1236  cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1237  if ((ELEMENT_MESSAGE == mt) && (cfe != NULL))
1238  {
1239  if ((new_mcfs != MSG_CFS_SENT) && (MSG_CFS_RECEIVED != cfe->offer))
1240  {
1242  "Received an element without sent offer!\n");
1243  return GNUNET_NO;
1244  }
1245  /* Check that only requested elements are received! */
1246  if ((ELEMENT_MESSAGE == mt) && (new_mcfs != MSG_CFS_SENT) && (cfe->demand !=
1247  MSG_CFS_SENT))
1248  {
1250  "Received an element that was not demanded\n");
1251  return GNUNET_NO;
1252  }
1253  }
1254 
1259  if (NULL == cfe)
1260  {
1261  cfe = GNUNET_new (struct messageControlFlowElement);
1262  if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (hash_map, hash_code,
1263  cfe,
1265  {
1266  GNUNET_free (cfe);
1267  return GNUNET_SYSERR;
1268  }
1269  }
1270 
1275  if (OFFER_MESSAGE == mt)
1276  {
1277  mcfs = &cfe->offer;
1278  }
1279  else if (DEMAND_MESSAGE == mt)
1280  {
1281  mcfs = &cfe->demand;
1282  }
1283  else if (ELEMENT_MESSAGE == mt)
1284  {
1285  mcfs = &cfe->element;
1286  }
1287  else
1288  {
1289  return GNUNET_SYSERR;
1290  }
1291 
1296  if (new_mcfs <= *mcfs)
1297  {
1298  return GNUNET_NO;
1299  }
1300 
1301  *mcfs = new_mcfs;
1302  return GNUNET_YES;
1303 }
1304 
1305 
1313 static int
1316  struct GNUNET_HashCode *hash_code,
1317  enum MESSAGE_TYPE mt)
1318 {
1319  struct messageControlFlowElement *cfe = NULL;
1320  enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1321 
1322  cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1323 
1328  if (cfe != NULL)
1329  {
1330  if (OFFER_MESSAGE == mt)
1331  {
1332  mcfs = &cfe->offer;
1333  }
1334  else if (DEMAND_MESSAGE == mt)
1335  {
1336  mcfs = &cfe->demand;
1337  }
1338  else if (ELEMENT_MESSAGE == mt)
1339  {
1340  mcfs = &cfe->element;
1341  }
1342  else
1343  {
1344  return GNUNET_SYSERR;
1345  }
1346 
1350  if (*mcfs != MSG_CFS_UNINITIALIZED)
1351  {
1352  return GNUNET_YES;
1353  }
1354  }
1355  return GNUNET_NO;
1356 }
1357 
1358 
1369 static int
1371  const struct GNUNET_HashCode *key,
1372  void *value)
1373 {
1374  struct messageControlFlowElement *mcfe = value;
1375 
1376  if (((mcfe->element == MSG_CFS_SENT) || (mcfe->element == MSG_CFS_RECEIVED) ))
1377  {
1378  return GNUNET_YES;
1379  }
1380  return GNUNET_NO;
1381 }
1382 
1383 
1393 static int
1395  const struct GNUNET_HashCode *key,
1396  void *value)
1397 {
1398  struct Operation *op = cls;
1399  struct GNUNET_SETU_Element *element = value;
1400  op->total_elements_size_local += element->size;
1401  return GNUNET_YES;
1402 }
1403 
1404 
1414 static int
1416  const struct GNUNET_HashCode *key,
1417  void *value)
1418 {
1419  struct Operation *op = cls;
1420 
1421  struct GNUNET_HashContext *hashed_key_context =
1423  struct GNUNET_HashCode new_key;
1424 
1428  GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1429  &key,
1430  sizeof(struct IBF_Key));
1431  GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1433  sizeof(uint32_t));
1434  GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
1435  &new_key);
1437  &new_key,value,
1439  return GNUNET_YES;
1440 }
1441 
1442 
1453 static int
1455  uint32_t key,
1456  void *value)
1457 {
1458  struct KeyEntry *k = value;
1459 
1460  GNUNET_assert (NULL != k);
1461  if (GNUNET_YES == k->element->remote)
1462  {
1463  GNUNET_free (k->element);
1464  k->element = NULL;
1465  }
1466  GNUNET_free (k);
1467  return GNUNET_YES;
1468 }
1469 
1470 
1477 static void
1478 send_client_done (void *cls)
1479 {
1480  struct Operation *op = cls;
1481  struct GNUNET_MQ_Envelope *ev;
1482  struct GNUNET_SETU_ResultMessage *rm;
1483 
1484  if (GNUNET_YES == op->client_done_sent)
1485  return;
1486  if (PHASE_FINISHED != op->phase)
1487  {
1489  "Union operation failed\n");
1490  GNUNET_STATISTICS_update (_GSS_statistics,
1491  "# Union operations failed",
1492  1,
1493  GNUNET_NO);
1496  rm->request_id = htonl (op->client_request_id);
1497  rm->element_type = htons (0);
1498  GNUNET_MQ_send (op->set->cs->mq,
1499  ev);
1500  return;
1501  }
1502 
1504 
1505  GNUNET_STATISTICS_update (_GSS_statistics,
1506  "# Union operations succeeded",
1507  1,
1508  GNUNET_NO);
1510  "Signalling client that union operation is done\n");
1511  ev = GNUNET_MQ_msg (rm,
1513  rm->request_id = htonl (op->client_request_id);
1514  rm->result_status = htons (GNUNET_SETU_STATUS_DONE);
1515  rm->element_type = htons (0);
1517  op->key_to_element));
1518  GNUNET_MQ_send (op->set->cs->mq,
1519  ev);
1520 }
1521 
1522 
1529 static int
1531 {
1532  if (op->byzantine != GNUNET_YES)
1533  return GNUNET_OK;
1534 
1538  if (op->remote_element_count + op->remote_set_diff >
1540  return GNUNET_SYSERR;
1542  return GNUNET_SYSERR;
1543 
1548  return GNUNET_SYSERR;
1549  return GNUNET_OK;
1550 }
1551 
1552 
1553 /* FIXME: the destroy logic is a mess and should be cleaned up! */
1554 
1567 static void
1569 {
1570  struct Set *set = op->set;
1571  struct GNUNET_CADET_Channel *channel;
1572 
1574  "Destroying union operation %p\n",
1575  op);
1576  GNUNET_assert (NULL == op->listener);
1577  /* check if the op was canceled twice */
1578  if (NULL != op->remote_ibf)
1579  {
1580  ibf_destroy (op->remote_ibf);
1581  op->remote_ibf = NULL;
1582  }
1583  if (NULL != op->demanded_hashes)
1584  {
1586  op->demanded_hashes = NULL;
1587  }
1588  if (NULL != op->local_ibf)
1589  {
1590  ibf_destroy (op->local_ibf);
1591  op->local_ibf = NULL;
1592  }
1593  if (NULL != op->se)
1594  {
1596  op->se = NULL;
1597  }
1598  if (NULL != op->key_to_element)
1599  {
1602  NULL);
1604  op->key_to_element = NULL;
1605  }
1606  if (NULL != set)
1607  {
1608  GNUNET_CONTAINER_DLL_remove (set->ops_head,
1609  set->ops_tail,
1610  op);
1611  op->set = NULL;
1612  }
1613  if (NULL != op->context_msg)
1614  {
1615  GNUNET_free (op->context_msg);
1616  op->context_msg = NULL;
1617  }
1618  if (NULL != (channel = op->channel))
1619  {
1620  /* This will free op; called conditionally as this helper function
1621  is also called from within the channel disconnect handler. */
1622  op->channel = NULL;
1623  GNUNET_CADET_channel_destroy (channel);
1624  }
1625  /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
1626  * there was a channel end handler that will free 'op' on the call stack. */
1627 }
1628 
1629 
1635 static void
1637 
1638 
1644 static void
1646 {
1647  struct Listener *listener;
1648 
1650  "Destroying incoming operation %p\n",
1651  op);
1652  if (NULL != (listener = op->listener))
1653  {
1655  listener->op_tail,
1656  op);
1657  op->listener = NULL;
1658  }
1659  if (NULL != op->timeout_task)
1660  {
1662  op->timeout_task = NULL;
1663  }
1665 }
1666 
1667 
1673 static void
1675 {
1676  struct GNUNET_CADET_Channel *channel;
1677 
1678  if (NULL != (channel = op->channel))
1679  {
1680  /* This will free op; called conditionally as this helper function
1681  is also called from within the channel disconnect handler. */
1682  op->channel = NULL;
1683  GNUNET_CADET_channel_destroy (channel);
1684  }
1685  if (NULL != op->listener)
1686  {
1687  incoming_destroy (op);
1688  return;
1689  }
1690  if (NULL != op->set)
1691  send_client_done (op);
1693  GNUNET_free (op);
1694 }
1695 
1696 
1703 static void
1705 {
1706  struct GNUNET_MQ_Envelope *ev;
1708 
1710  "union operation failed\n");
1713  msg->request_id = htonl (op->client_request_id);
1714  msg->element_type = htons (0);
1715  GNUNET_MQ_send (op->set->cs->mq,
1716  ev);
1718 }
1719 
1720 
1731 static void
1733 {
1734  if (GNUNET_YES != op->byzantine)
1735  return;
1736 
1737  int security_level_lb = -1 * SECURITY_LEVEL;
1738  uint64_t duplicates = op->received_fresh - op->received_total;
1739 
1740  /*
1741  * Protect full sync from receiving double element when in FULL SENDING
1742  */
1743  if (PHASE_FULL_SENDING == op->phase)
1744  {
1745  if (duplicates > 0)
1746  {
1748  "PROTOCOL VIOLATION: Received duplicate element in full receiving "
1749  "mode of operation this is not allowed! Duplicates: %llu\n",
1750  (unsigned long long) duplicates);
1751  GNUNET_break_op (0);
1752  fail_union_operation (op);
1753  return;
1754  }
1755 
1756  }
1757 
1758  /*
1759  * Protect full sync with probabilistic algorithm
1760  */
1761  if (PHASE_FULL_RECEIVING == op->phase)
1762  {
1763  if (0 == op->remote_set_diff)
1764  op->remote_set_diff = 1;
1765 
1766  long double base = (1 - (long double) (op->remote_set_diff
1767  / (long double) (op->initial_size
1768  + op->
1769  remote_set_diff)));
1770  long double exponent = (op->received_total - (op->received_fresh * ((long
1771  double)
1772  op->
1773  initial_size
1774  / (long
1775  double)
1776  op->
1777  remote_set_diff)));
1778  long double value = exponent * (log2l (base) / log2l (2));
1779  if ((value < security_level_lb) || (value > SECURITY_LEVEL) )
1780  {
1782  "PROTOCOL VIOLATION: Other peer violated probabilistic rule for receiving "
1783  "to many duplicated full element : %LF\n",
1784  value);
1785  GNUNET_break_op (0);
1786  fail_union_operation (op);
1787  return;
1788  }
1789  }
1790 }
1791 
1792 
1797 static void
1799 {
1800  double probability = op->differential_sync_iterations * (log2l (
1802  / log2l (2));
1803  if ((-1 * SECURITY_LEVEL) > probability)
1804  {
1806  "PROTOCOL VIOLATION: Other peer violated probabilistic rule for to many active passive "
1807  "switches in differential sync: %u\n",
1809  GNUNET_break_op (0);
1810  fail_union_operation (op);
1811  return;
1812  }
1813 }
1814 
1815 
1823 static struct IBF_Key
1824 get_ibf_key (const struct GNUNET_HashCode *src)
1825 {
1826  struct IBF_Key key;
1827  uint16_t salt = 0;
1828 
1830  GNUNET_CRYPTO_kdf (&key, sizeof(key),
1831  src, sizeof *src,
1832  &salt, sizeof(salt),
1833  NULL, 0));
1834  return key;
1835 }
1836 
1837 
1841 struct GetElementContext
1842 {
1846  struct GNUNET_HashCode hash;
1847 
1851  struct KeyEntry *k;
1852 };
1853 
1854 
1865 static int
1867  uint32_t key,
1868  void *value)
1869 {
1870  struct GetElementContext *ctx = cls;
1871  struct KeyEntry *k = value;
1872 
1873  GNUNET_assert (NULL != k);
1875  &ctx->hash))
1876  {
1877  ctx->k = k;
1878  return GNUNET_NO;
1879  }
1880  return GNUNET_YES;
1881 }
1882 
1883 
1892 static struct KeyEntry *
1894  const struct GNUNET_HashCode *element_hash)
1895 {
1896  int ret;
1897  struct IBF_Key ibf_key;
1898  struct GetElementContext ctx = { { { 0 } }, 0 };
1899 
1900  ctx.hash = *element_hash;
1901 
1902  ibf_key = get_ibf_key (element_hash);
1904  (uint32_t) ibf_key.key_val,
1906  &ctx);
1907 
1908  /* was the iteration aborted because we found the element? */
1909  if (GNUNET_SYSERR == ret)
1910  {
1911  GNUNET_assert (NULL != ctx.k);
1912  return ctx.k;
1913  }
1914  return NULL;
1915 }
1916 
1917 
1932 static void
1934  struct ElementEntry *ee,
1935  int received)
1936 {
1937  struct IBF_Key ibf_key;
1938  struct KeyEntry *k;
1939 
1940  ibf_key = get_ibf_key (&ee->element_hash);
1941  k = GNUNET_new (struct KeyEntry);
1942  k->element = ee;
1943  k->ibf_key = ibf_key;
1944  k->received = received;
1947  (uint32_t) ibf_key.key_val,
1948  k,
1950 }
1951 
1952 
1957 static void
1958 salt_key (const struct IBF_Key *k_in,
1959  uint32_t salt,
1960  struct IBF_Key *k_out)
1961 {
1962  int s = (salt * 7) % 64;
1963  uint64_t x = k_in->key_val;
1964 
1965  /* rotate ibf key */
1966  x = (x >> s) | (x << (64 - s));
1967  k_out->key_val = x;
1968 }
1969 
1970 
1974 static void
1975 unsalt_key (const struct IBF_Key *k_in,
1976  uint32_t salt,
1977  struct IBF_Key *k_out)
1978 {
1979  int s = (salt * 7) % 64;
1980  uint64_t x = k_in->key_val;
1981 
1982  x = (x << s) | (x >> (64 - s));
1983  k_out->key_val = x;
1984 }
1985 
1986 
1994 static int
1996  uint32_t key,
1997  void *value)
1998 {
1999  struct Operation *op = cls;
2000  struct KeyEntry *ke = value;
2001  struct IBF_Key salted_key;
2002 
2004  "[OP %p] inserting %lx (hash %s) into ibf\n",
2005  op,
2006  (unsigned long) ke->ibf_key.key_val,
2007  GNUNET_h2s (&ke->element->element_hash));
2008  salt_key (&ke->ibf_key,
2009  op->salt_send,
2010  &salted_key);
2011  ibf_insert (op->local_ibf, salted_key);
2012  return GNUNET_YES;
2013 }
2014 
2015 
2023 static int
2025  struct Operation *op)
2026 {
2027  return ee->generation >= op->generation_created;
2028 }
2029 
2030 
2041 static int
2043  const struct GNUNET_HashCode *key,
2044  void *value)
2045 {
2046  struct Operation *op = cls;
2047  struct ElementEntry *ee = value;
2048 
2049  /* make sure that the element belongs to the set at the time
2050  * of creating the operation */
2051  if (GNUNET_NO ==
2053  op))
2054  return GNUNET_YES;
2055  GNUNET_assert (GNUNET_NO == ee->remote);
2056  op_register_element (op,
2057  ee,
2058  GNUNET_NO);
2059  return GNUNET_YES;
2060 }
2061 
2062 
2068 static void
2070 {
2071  unsigned int len;
2072 
2073  GNUNET_assert (NULL == op->key_to_element);
2078  op);
2079 }
2080 
2081 
2090 static int
2092  uint32_t size)
2093 {
2094  GNUNET_assert (NULL != op->key_to_element);
2095 
2096  if (NULL != op->local_ibf)
2097  ibf_destroy (op->local_ibf);
2098  // op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
2099  op->local_ibf = ibf_create (size,
2100  ((uint8_t) op->ibf_number_buckets_per_element));
2101  if (NULL == op->local_ibf)
2102  {
2104  "Failed to allocate local IBF\n");
2105  return GNUNET_SYSERR;
2106  }
2109  op);
2110  return GNUNET_OK;
2111 }
2112 
2113 
2123 static int
2124 send_ibf (struct Operation *op,
2125  uint32_t ibf_size)
2126 {
2127  uint64_t buckets_sent = 0;
2128  struct InvertibleBloomFilter *ibf;
2130 
2134  uint32_t ibf_min_size = IBF_MIN_SIZE;
2135 
2136  if (ibf_size < ibf_min_size)
2137  {
2138  ibf_size = ibf_min_size;
2139  }
2140  if (GNUNET_OK !=
2141  prepare_ibf (op, ibf_size))
2142  {
2143  /* allocation failed */
2144  return GNUNET_SYSERR;
2145  }
2146 
2148  "sending ibf of size %u\n",
2149  1 << ibf_size);
2150 
2151  {
2152  char name[64];
2153  GNUNET_snprintf (name, sizeof(name), "# sent IBF (order %u)", ibf_size);
2154  GNUNET_STATISTICS_update (_GSS_statistics, name, 1, GNUNET_NO);
2155  }
2156 
2157  ibf = op->local_ibf;
2158 
2159  while (buckets_sent < ibf_size)
2160  {
2161  unsigned int buckets_in_message;
2162  struct GNUNET_MQ_Envelope *ev;
2163  struct IBFMessage *msg;
2164 
2165  buckets_in_message = ibf_size - buckets_sent;
2166  /* limit to maximum */
2167  if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
2168  buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
2169 
2170 #if MEASURE_PERFORMANCE
2171  perf_store.ibf.sent += 1;
2172  perf_store.ibf.sent_var_bytes += (buckets_in_message * IBF_BUCKET_SIZE);
2173 #endif
2174  ev = GNUNET_MQ_msg_extra (msg,
2175  buckets_in_message * IBF_BUCKET_SIZE,
2177  msg->ibf_size = ibf_size;
2178  msg->offset = htonl (buckets_sent);
2179  msg->salt = htonl (op->salt_send);
2181 
2182 
2183  ibf_write_slice (ibf, buckets_sent,
2184  buckets_in_message, &msg[1], msg->ibf_counter_bit_length);
2185  buckets_sent += buckets_in_message;
2187  "ibf chunk size %u, %llu/%u sent\n",
2188  (unsigned int) buckets_in_message,
2189  (unsigned long long) buckets_sent,
2190  (unsigned int) ibf_size);
2191  GNUNET_MQ_send (op->mq, ev);
2192  }
2193 
2194  /* The other peer must decode the IBF, so
2195  * we're passive. */
2197  return GNUNET_OK;
2198 }
2199 
2200 
2208 static unsigned int
2209 get_size_from_difference (unsigned int diff, int number_buckets_per_element,
2210  float ibf_bucket_number_factor)
2211 {
2214  return (((int) (diff * ibf_bucket_number_factor)) | 1);
2215 
2216 }
2217 
2218 
2219 static unsigned int
2220 get_next_ibf_size (float ibf_bucket_number_factor, unsigned int
2221  decoded_elements, unsigned int last_ibf_size)
2222 {
2223  unsigned int next_size = (unsigned int) ((last_ibf_size * 2)
2224  - (ibf_bucket_number_factor
2225  * decoded_elements));
2228  return next_size | 1;
2229 }
2230 
2231 
2241 static int
2243  const struct GNUNET_HashCode *key,
2244  void *value)
2245 {
2246  struct Operation *op = cls;
2247  struct GNUNET_SETU_ElementMessage *emsg;
2248  struct ElementEntry *ee = value;
2249  struct GNUNET_SETU_Element *el = &ee->element;
2250  struct GNUNET_MQ_Envelope *ev;
2251 
2253  "Sending element %s\n",
2254  GNUNET_h2s (key));
2255 #if MEASURE_PERFORMANCE
2256  perf_store.element_full.received += 1;
2257  perf_store.element_full.received_var_bytes += el->size;
2258 #endif
2259  ev = GNUNET_MQ_msg_extra (emsg,
2260  el->size,
2262  emsg->element_type = htons (el->element_type);
2263  GNUNET_memcpy (&emsg[1],
2264  el->data,
2265  el->size);
2266  GNUNET_MQ_send (op->mq,
2267  ev);
2268  return GNUNET_YES;
2269 }
2270 
2271 
2277 static void
2279 {
2280  struct GNUNET_MQ_Envelope *ev;
2281 
2282  op->phase = PHASE_FULL_SENDING;
2284  "Dedicing to transmit the full set\n");
2285  /* FIXME: use a more memory-friendly way of doing this with an
2286  iterator, just as we do in the non-full case! */
2287 
2288  // Randomize Elements to send
2290  32,GNUNET_NO);
2293  UINT64_MAX);
2295  &
2297  op);
2298 
2302  op);
2303 #if MEASURE_PERFORMANCE
2304  perf_store.full_done.sent += 1;
2305 #endif
2307  GNUNET_MQ_send (op->mq,
2308  ev);
2309 }
2310 
2311 
2318 static int
2320  const struct StrataEstimatorMessage *msg)
2321 {
2322  struct Operation *op = cls;
2323  int is_compressed;
2324  size_t len;
2325 
2326  if (op->phase != PHASE_EXPECT_SE)
2327  {
2328  GNUNET_break (0);
2329  return GNUNET_SYSERR;
2330  }
2331  is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
2332  msg->header.type));
2333  len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
2334  if ((GNUNET_NO == is_compressed) &&
2336  {
2337  GNUNET_break (0);
2338  return GNUNET_SYSERR;
2339  }
2340  return GNUNET_OK;
2341 }
2342 
2343 
2350 static void
2352  const struct StrataEstimatorMessage *msg)
2353 {
2354 #if MEASURE_PERFORMANCE
2355  perf_store.se.received += 1;
2356  perf_store.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct
2358 #endif
2359  struct Operation *op = cls;
2360  struct MultiStrataEstimator *remote_se;
2361  unsigned int diff;
2362  uint64_t other_size;
2363  size_t len;
2364  int is_compressed;
2366  op->set->content->elements);
2367  // Setting peer site to receiving peer
2368  op->peer_site = 1;
2369 
2373  uint8_t allowed_phases[] = {PHASE_EXPECT_SE};
2374  if (GNUNET_OK !=
2375  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2376  {
2377  GNUNET_break (0);
2378  fail_union_operation (op);
2379  return;
2380  }
2381 
2383  if ((msg->se_count > 8) || (__builtin_popcount ((int) msg->se_count) != 1))
2384  {
2386  "PROTOCOL VIOLATION: Invalid number of se transmitted by other peer %u\n",
2387  msg->se_count);
2388  GNUNET_break_op (0);
2389  fail_union_operation (op);
2390  return;
2391  }
2392 
2393  is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
2394  msg->header.type));
2395  GNUNET_STATISTICS_update (_GSS_statistics,
2396  "# bytes of SE received",
2397  ntohs (msg->header.size),
2398  GNUNET_NO);
2399  len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
2400  other_size = GNUNET_ntohll (msg->set_size);
2401  op->remote_element_count = other_size;
2402 
2404  {
2406  "Exceeded configured upper bound <%lu> of element: %u\n",
2408  op->remote_element_count);
2409  fail_union_operation (op);
2410  return;
2411  }
2412 
2415  SE_IBF_HASH_NUM);
2416  if (NULL == remote_se)
2417  {
2418  /* insufficient resources, fail */
2419  fail_union_operation (op);
2420  return;
2421  }
2422  if (GNUNET_OK !=
2423  strata_estimator_read (&msg[1],
2424  len,
2425  is_compressed,
2426  msg->se_count,
2428  remote_se))
2429  {
2430  /* decompression failed */
2431  strata_estimator_destroy (remote_se);
2432  fail_union_operation (op);
2433  return;
2434  }
2435  GNUNET_assert (NULL != op->se);
2436  strata_estimator_difference (remote_se,
2437  op->se);
2438 
2439  /* Calculate remote local diff */
2440  long diff_remote = remote_se->stratas[0]->strata[0]->remote_decoded_count;
2441  long diff_local = remote_se->stratas[0]->strata[0]->local_decoded_count;
2442 
2443  /* Prevent estimations from overshooting max element */
2444  if (diff_remote + op->remote_element_count > op->byzantine_upper_bound)
2445  diff_remote = op->byzantine_upper_bound - op->remote_element_count;
2446  if (diff_local + op->local_element_count > op->byzantine_upper_bound)
2447  diff_local = op->byzantine_upper_bound - op->local_element_count;
2448  if ((diff_remote < 0) || (diff_local < 0))
2449  {
2450  strata_estimator_destroy (remote_se);
2452  "PROTOCOL VIOLATION: More element is set as upper boundary or other peer is "
2453  "malicious: remote diff %ld, local diff: %ld\n",
2454  diff_remote, diff_local);
2455  GNUNET_break_op (0);
2456  fail_union_operation (op);
2457  return;
2458  }
2459 
2460  /* Make estimation more precise in initial sync cases */
2461  if (0 == op->remote_element_count)
2462  {
2463  diff_remote = 0;
2464  diff_local = op->local_element_count;
2465  }
2466  if (0 == op->local_element_count)
2467  {
2468  diff_local = 0;
2469  diff_remote = op->remote_element_count;
2470  }
2471 
2472  diff = diff_remote + diff_local;
2473  op->remote_set_diff = diff_remote;
2474 
2476  uint64_t avg_element_size = 0;
2477  if (0 < op->local_element_count)
2478  {
2479  op->total_elements_size_local = 0;
2481  &
2483  op);
2484  avg_element_size = op->total_elements_size_local / op->local_element_count;
2485  }
2486 
2487  op->mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2489  op->set->content->
2490  elements),
2491  op->
2492  remote_element_count,
2493  diff_remote,
2494  diff_local,
2495  op->
2496  rtt_bandwidth_tradeoff,
2497  op->
2498  ibf_bucket_number_factor);
2499 
2500 #if MEASURE_PERFORMANCE
2501  perf_store.se_diff_local = diff_local;
2502  perf_store.se_diff_remote = diff_remote;
2503  perf_store.se_diff = diff;
2504  perf_store.mode_of_operation = op->mode_of_operation;
2505 #endif
2506 
2507  strata_estimator_destroy (remote_se);
2509  op->se = NULL;
2511  "got se diff=%d, using ibf size %d\n",
2512  diff,
2515 
2516  {
2517  char *set_debug;
2518 
2519  set_debug = getenv ("GNUNET_SETU_BENCHMARK");
2520  if ((NULL != set_debug) &&
2521  (0 == strcmp (set_debug, "1")))
2522  {
2523  FILE *f = fopen ("set.log", "a");
2524  fprintf (f, "%llu\n", (unsigned long long) diff);
2525  fclose (f);
2526  }
2527  }
2528 
2529  if ((GNUNET_YES == op->byzantine) &&
2530  (other_size < op->byzantine_lower_bound))
2531  {
2532  GNUNET_break (0);
2533  fail_union_operation (op);
2534  return;
2535  }
2536 
2537  if ((GNUNET_YES == op->force_full) ||
2539  {
2541  "Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
2542  diff,
2543  (unsigned long long) op->initial_size);
2544  GNUNET_STATISTICS_update (_GSS_statistics,
2545  "# of full sends",
2546  1,
2547  GNUNET_NO);
2549  {
2550  struct TransmitFullMessage *signal_msg;
2551  struct GNUNET_MQ_Envelope *ev;
2552  ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2554  signal_msg->remote_set_difference = htonl (diff_local);
2555  signal_msg->remote_set_size = htonl (op->local_element_count);
2556  signal_msg->local_set_difference = htonl (diff_remote);
2557  GNUNET_MQ_send (op->mq,
2558  ev);
2559  send_full_set (op);
2560  }
2561  else
2562  {
2563  struct GNUNET_MQ_Envelope *ev;
2564 
2566  "Telling other peer that we expect its full set\n");
2568 #if MEASURE_PERFORMANCE
2569  perf_store.request_full.sent += 1;
2570 #endif
2571  struct TransmitFullMessage *signal_msg;
2572  ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2574  signal_msg->remote_set_difference = htonl (diff_local);
2575  signal_msg->remote_set_size = htonl (op->local_element_count);
2576  signal_msg->local_set_difference = htonl (diff_remote);
2577  GNUNET_MQ_send (op->mq,
2578  ev);
2579  }
2580  }
2581  else
2582  {
2583  GNUNET_STATISTICS_update (_GSS_statistics,
2584  "# of ibf sends",
2585  1,
2586  GNUNET_NO);
2587  if (GNUNET_OK !=
2588  send_ibf (op,
2592  {
2593  /* Internal error, best we can do is shut the connection */
2595  "Failed to send IBF, closing connection\n");
2596  fail_union_operation (op);
2597  return;
2598  }
2599  }
2601 }
2602 
2603 
2611 static int
2613  uint32_t key,
2614  void *value)
2615 {
2616  struct SendElementClosure *sec = cls;
2617  struct Operation *op = sec->op;
2618  struct KeyEntry *ke = value;
2619  struct GNUNET_MQ_Envelope *ev;
2620  struct GNUNET_MessageHeader *mh;
2621 
2622  /* Detect 32-bit key collision for the 64-bit IBF keys. */
2623  if (ke->ibf_key.key_val != sec->ibf_key.key_val)
2624  {
2625  op->active_passive_switch_required = true;
2626  return GNUNET_YES;
2627  }
2628 
2629  /* Prevent implementation from sending a offer multiple times in case of roll switch */
2630  if (GNUNET_YES ==
2633  &ke->element->element_hash,
2634  OFFER_MESSAGE)
2635  )
2636  {
2638  "Skipping already sent processed element offer!\n");
2639  return GNUNET_YES;
2640  }
2641 
2642  /* Save send offer message for message control */
2643  if (GNUNET_YES !=
2646  MSG_CFS_SENT,
2647  &ke->element->element_hash,
2648  OFFER_MESSAGE)
2649  )
2650  {
2652  "Double offer message sent found!\n");
2653  GNUNET_break (0);
2654  fail_union_operation (op);
2655  return GNUNET_NO;
2656  }
2657  ;
2658 
2659  /* Mark element to be expected to received */
2660  if (GNUNET_YES !=
2664  &ke->element->element_hash,
2666  )
2667  {
2669  "Double demand received found!\n");
2670  GNUNET_break (0);
2671  fail_union_operation (op);
2672  return GNUNET_NO;
2673  }
2674  ;
2675 #if MEASURE_PERFORMANCE
2676  perf_store.offer.sent += 1;
2677  perf_store.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode);
2678 #endif
2679  ev = GNUNET_MQ_msg_header_extra (mh,
2680  sizeof(struct GNUNET_HashCode),
2682  GNUNET_assert (NULL != ev);
2683  *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
2685  "[OP %p] sending element offer (%s) to peer\n",
2686  op,
2687  GNUNET_h2s (&ke->element->element_hash));
2688  GNUNET_MQ_send (op->mq, ev);
2689  return GNUNET_YES;
2690 }
2691 
2692 
2699 void
2701  struct IBF_Key ibf_key)
2702 {
2703  struct SendElementClosure send_cls;
2704 
2705  send_cls.ibf_key = ibf_key;
2706  send_cls.op = op;
2708  op->key_to_element,
2709  (uint32_t) ibf_key.
2710  key_val,
2712  &send_cls);
2713 }
2714 
2715 
2723 static int
2725 {
2726  struct IBF_Key key;
2727  struct IBF_Key last_key;
2728  int side;
2729  unsigned int num_decoded;
2730  struct InvertibleBloomFilter *diff_ibf;
2731 
2733 
2734  if (GNUNET_OK !=
2735  prepare_ibf (op,
2736  op->remote_ibf->size))
2737  {
2738  GNUNET_break (0);
2739  /* allocation failed */
2740  return GNUNET_SYSERR;
2741  }
2742 
2743  diff_ibf = ibf_dup (op->local_ibf);
2744  ibf_subtract (diff_ibf,
2745  op->remote_ibf);
2746 
2747  ibf_destroy (op->remote_ibf);
2748  op->remote_ibf = NULL;
2749 
2751  "decoding IBF (size=%u)\n",
2752  diff_ibf->size);
2753 
2754  num_decoded = 0;
2755  key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
2756 
2757  while (1)
2758  {
2759  int res;
2760  int cycle_detected = GNUNET_NO;
2761 
2762  last_key = key;
2763 
2764  res = ibf_decode (diff_ibf,
2765  &side,
2766  &key);
2767  if (res == GNUNET_OK)
2768  {
2770  "decoded ibf key %lx\n",
2771  (unsigned long) key.key_val);
2772  num_decoded += 1;
2773  if ((num_decoded > diff_ibf->size) ||
2774  ((num_decoded > 1) &&
2775  (last_key.key_val == key.key_val)))
2776  {
2778  "detected cyclic ibf (decoded %u/%u)\n",
2779  num_decoded,
2780  diff_ibf->size);
2781  cycle_detected = GNUNET_YES;
2782  }
2783  }
2784  if ((GNUNET_SYSERR == res) ||
2785  (GNUNET_YES == cycle_detected))
2786  {
2787  uint32_t next_size;
2790  next_size = get_next_ibf_size (op->ibf_bucket_number_factor, num_decoded,
2791  diff_ibf->size);
2794  uint32_t ibf_min_size = IBF_MIN_SIZE | 1;
2795 
2796  if (next_size<ibf_min_size)
2797  next_size = ibf_min_size;
2798 
2799 
2800  if (next_size <= MAX_IBF_SIZE)
2801  {
2803  "decoding failed, sending larger ibf (size %u)\n",
2804  next_size);
2805  GNUNET_STATISTICS_update (_GSS_statistics,
2806  "# of IBF retries",
2807  1,
2808  GNUNET_NO);
2809 #if MEASURE_PERFORMANCE
2810  perf_store.active_passive_switches += 1;
2811 #endif
2812 
2813  op->salt_send = op->salt_receive++;
2814 
2815  if (GNUNET_OK !=
2816  send_ibf (op, next_size))
2817  {
2818  /* Internal error, best we can do is shut the connection */
2820  "Failed to send IBF, closing connection\n");
2821  fail_union_operation (op);
2822  ibf_destroy (diff_ibf);
2823  return GNUNET_SYSERR;
2824  }
2825  }
2826  else
2827  {
2828  GNUNET_STATISTICS_update (_GSS_statistics,
2829  "# of failed union operations (too large)",
2830  1,
2831  GNUNET_NO);
2832  // XXX: Send the whole set, element-by-element
2834  "set union failed: reached ibf limit\n");
2835  fail_union_operation (op);
2836  ibf_destroy (diff_ibf);
2837  return GNUNET_SYSERR;
2838  }
2839  break;
2840  }
2841  if (GNUNET_NO == res)
2842  {
2843  struct GNUNET_MQ_Envelope *ev;
2844 
2846  "transmitted all values, sending DONE\n");
2847 
2848 #if MEASURE_PERFORMANCE
2849  perf_store.done.sent += 1;
2850 #endif
2852  GNUNET_MQ_send (op->mq, ev);
2853  /* We now wait until we get a DONE message back
2854  * and then wait for our MQ to be flushed and all our
2855  * demands be delivered. */
2856  break;
2857  }
2858  if (1 == side)
2859  {
2860  struct IBF_Key unsalted_key;
2861  unsalt_key (&key,
2862  op->salt_receive,
2863  &unsalted_key);
2864  send_offers_for_key (op,
2865  unsalted_key);
2866  }
2867  else if (-1 == side)
2868  {
2869  struct GNUNET_MQ_Envelope *ev;
2870  struct InquiryMessage *msg;
2871 
2872 #if MEASURE_PERFORMANCE
2873  perf_store.inquery.sent += 1;
2874  perf_store.inquery.sent_var_bytes += sizeof(struct IBF_Key);
2875 #endif
2876 
2878  struct GNUNET_HashContext *hashed_key_context =
2880  struct GNUNET_HashCode *hashed_key = (struct
2882  sizeof(struct GNUNET_HashCode));
2884  GNUNET_CRYPTO_hash_context_read (hashed_key_context,
2885  &key,
2886  sizeof(struct IBF_Key));
2887  GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
2888  hashed_key);
2890  hashed_key,
2891  &mcfs,
2893  );
2894 
2895  /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
2896  * the effort additional complexity. */
2897  ev = GNUNET_MQ_msg_extra (msg,
2898  sizeof(struct IBF_Key),
2900  msg->salt = htonl (op->salt_receive);
2901  GNUNET_memcpy (&msg[1],
2902  &key,
2903  sizeof(struct IBF_Key));
2905  "sending element inquiry for IBF key %lx\n",
2906  (unsigned long) key.key_val);
2907  GNUNET_MQ_send (op->mq, ev);
2908  }
2909  else
2910  {
2911  GNUNET_assert (0);
2912  }
2913  }
2914  ibf_destroy (diff_ibf);
2915  return GNUNET_OK;
2916 }
2917 
2918 
2926 static int
2928  const struct TransmitFullMessage *msg)
2929 {
2930  return GNUNET_OK;
2931 }
2932 
2933 
2940 static void
2942  const struct TransmitFullMessage *msg)
2943 {
2944  struct Operation *op = cls;
2945 
2949  uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
2950  if (GNUNET_OK !=
2951  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2952  {
2953  GNUNET_break (0);
2954  fail_union_operation (op);
2955  return;
2956  }
2957 
2959  op->remote_element_count = ntohl (msg->remote_set_size);
2960  op->remote_set_diff = ntohl (msg->remote_set_difference);
2961  op->local_set_diff = ntohl (msg->local_set_difference);
2962 
2964  if (check_byzantine_bounds (op) != GNUNET_OK)
2965  {
2967  "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
2968  "criteria\n");
2969  GNUNET_break_op (0);
2970  fail_union_operation (op);
2971  return;
2972  }
2973 
2976  op->set->content->elements);
2977  uint64_t avg_element_size = 0;
2978  if (0 < op->local_element_count)
2979  {
2980  op->total_elements_size_local = 0;
2982  &
2984  op);
2985  avg_element_size = op->total_elements_size_local / op->local_element_count;
2986  }
2987 
2989  int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2990  op->
2992  op->
2994  op->local_set_diff,
2995  op->remote_set_diff,
2996  op->
2998  op->
3000  if (FULL_SYNC_LOCAL_SENDING_FIRST != mode_of_operation)
3001  {
3003  "PROTOCOL VIOLATION: Remote peer choose to send his full set first but correct mode would have been"
3004  " : %d\n", mode_of_operation);
3005  GNUNET_break_op (0);
3006  fail_union_operation (op);
3007  return;
3008  }
3010 }
3011 
3012 
3023 static int
3025  const struct IBFMessage *msg)
3026 {
3027  struct Operation *op = cls;
3028  unsigned int buckets_in_message;
3029 
3030  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
3031  / IBF_BUCKET_SIZE;
3032  if (0 == buckets_in_message)
3033  {
3034  GNUNET_break_op (0);
3035  return GNUNET_SYSERR;
3036  }
3037  if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
3038  * IBF_BUCKET_SIZE)
3039  {
3040  GNUNET_break_op (0);
3041  return GNUNET_SYSERR;
3042  }
3043  if (op->phase == PHASE_EXPECT_IBF_LAST)
3044  {
3045  if (ntohl (msg->offset) != op->ibf_buckets_received)
3046  {
3047  GNUNET_break_op (0);
3048  return GNUNET_SYSERR;
3049  }
3050 
3051  if (msg->ibf_size != op->remote_ibf->size)
3052  {
3053  GNUNET_break_op (0);
3054  return GNUNET_SYSERR;
3055  }
3056  if (ntohl (msg->salt) != op->salt_receive)
3057  {
3058  GNUNET_break_op (0);
3059  return GNUNET_SYSERR;
3060  }
3061  }
3062  else if ((op->phase != PHASE_PASSIVE_DECODING) &&
3063  (op->phase != PHASE_EXPECT_IBF))
3064  {
3065  GNUNET_break_op (0);
3066  return GNUNET_SYSERR;
3067  }
3068 
3069  return GNUNET_OK;
3070 }
3071 
3072 
3082 static void
3084  const struct IBFMessage *msg)
3085 {
3086  struct Operation *op = cls;
3087  unsigned int buckets_in_message;
3091  uint8_t allowed_phases[] = {PHASE_EXPECT_IBF, PHASE_EXPECT_IBF_LAST,
3093  if (GNUNET_OK !=
3094  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3095  {
3096  GNUNET_break (0);
3097  fail_union_operation (op);
3098  return;
3099  }
3102  op->active_passive_switch_required = false;
3103 
3104 #if MEASURE_PERFORMANCE
3105  perf_store.ibf.received += 1;
3106  perf_store.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg);
3107 #endif
3108 
3109  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
3110  / IBF_BUCKET_SIZE;
3111  if ((op->phase == PHASE_PASSIVE_DECODING) ||
3112  (op->phase == PHASE_EXPECT_IBF))
3113  {
3115  GNUNET_assert (NULL == op->remote_ibf);
3117  "Creating new ibf of size %u\n",
3118  ntohl (msg->ibf_size));
3119  // op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
3120  op->remote_ibf = ibf_create (msg->ibf_size,
3121  ((uint8_t) op->ibf_number_buckets_per_element));
3122  op->salt_receive = ntohl (msg->salt);
3124  "Receiving new IBF with salt %u\n",
3125  op->salt_receive);
3126  if (NULL == op->remote_ibf)
3127  {
3129  "Failed to parse remote IBF, closing connection\n");
3130  fail_union_operation (op);
3131  return;
3132  }
3133  op->ibf_buckets_received = 0;
3134  if (0 != ntohl (msg->offset))
3135  {
3136  GNUNET_break_op (0);
3137  fail_union_operation (op);
3138  return;
3139  }
3140  }
3141  else
3142  {
3145  "Received more of IBF\n");
3146  }
3147  GNUNET_assert (NULL != op->remote_ibf);
3148 
3149  ibf_read_slice (&msg[1],
3151  buckets_in_message,
3153  op->ibf_buckets_received += buckets_in_message;
3154 
3155  if (op->ibf_buckets_received == op->remote_ibf->size)
3156  {
3158  "received full ibf\n");
3160  if (GNUNET_OK !=
3161  decode_and_send (op))
3162  {
3163  /* Internal error, best we can do is shut down */
3165  "Failed to decode IBF, closing connection\n");
3166  fail_union_operation (op);
3167  return;
3168  }
3169  }
3171 }
3172 
3173 
3182 static void
3184  const struct GNUNET_SETU_Element *element,
3186 {
3187  struct GNUNET_MQ_Envelope *ev;
3188  struct GNUNET_SETU_ResultMessage *rm;
3189 
3191  "sending element (size %u) to client\n",
3192  element->size);
3193  GNUNET_assert (0 != op->client_request_id);
3194  ev = GNUNET_MQ_msg_extra (rm,
3195  element->size,
3197  if (NULL == ev)
3198  {
3199  GNUNET_MQ_discard (ev);
3200  GNUNET_break (0);
3201  return;
3202  }
3203  rm->result_status = htons (status);
3204  rm->request_id = htonl (op->client_request_id);
3205  rm->element_type = htons (element->element_type);
3207  op->key_to_element));
3208  GNUNET_memcpy (&rm[1],
3209  element->data,
3210  element->size);
3211  GNUNET_MQ_send (op->set->cs->mq,
3212  ev);
3213 }
3214 
3215 
3221 static void
3223 {
3224  unsigned int num_demanded;
3225 
3226  num_demanded = GNUNET_CONTAINER_multihashmap_size (
3227  op->demanded_hashes);
3228  int send_done = GNUNET_CONTAINER_multihashmap_iterate (
3230  &
3232  op);
3233  if (PHASE_FINISH_WAITING == op->phase)
3234  {
3236  "In PHASE_FINISH_WAITING, pending %u demands -> %d\n",
3237  num_demanded, op->peer_site);
3238  if (-1 != send_done)
3239  {
3240  struct GNUNET_MQ_Envelope *ev;
3241 
3242  op->phase = PHASE_FINISHED;
3243 #if MEASURE_PERFORMANCE
3244  perf_store.done.sent += 1;
3245 #endif
3247  GNUNET_MQ_send (op->mq,
3248  ev);
3249  /* We now wait until the other peer sends P2P_OVER
3250  * after it got all elements from us. */
3251  }
3252  }
3253  if (PHASE_FINISH_CLOSING == op->phase)
3254  {
3256  "In PHASE_FINISH_CLOSING, pending %u demands %d\n",
3257  num_demanded, op->peer_site);
3258  if (-1 != send_done)
3259  {
3260  op->phase = PHASE_FINISHED;
3261  send_client_done (op);
3263  }
3264  }
3265 }
3266 
3267 
3274 static int
3276  const struct GNUNET_SETU_ElementMessage *emsg)
3277 {
3278  struct Operation *op = cls;
3279 
3281  {
3282  GNUNET_break_op (0);
3283  return GNUNET_SYSERR;
3284  }
3285  return GNUNET_OK;
3286 }
3287 
3288 
3297 static void
3299  const struct GNUNET_SETU_ElementMessage *emsg)
3300 {
3301  struct Operation *op = cls;
3302  struct ElementEntry *ee;
3303  struct KeyEntry *ke;
3304  uint16_t element_size;
3305 
3309  uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3311  if (GNUNET_OK !=
3312  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3313  {
3314  GNUNET_break (0);
3315  fail_union_operation (op);
3316  return;
3317  }
3318 
3319  element_size = ntohs (emsg->header.size) - sizeof(struct
3321 #if MEASURE_PERFORMANCE
3322  perf_store.element.received += 1;
3323  perf_store.element.received_var_bytes += element_size;
3324 #endif
3325 
3326  ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
3327  GNUNET_memcpy (&ee[1],
3328  &emsg[1],
3329  element_size);
3330  ee->element.size = element_size;
3331  ee->element.data = &ee[1];
3332  ee->element.element_type = ntohs (emsg->element_type);
3333  ee->remote = GNUNET_YES;
3335  &ee->element_hash);
3336  if (GNUNET_NO ==
3338  &ee->element_hash,
3339  NULL))
3340  {
3341  /* We got something we didn't demand, since it's not in our map. */
3342  GNUNET_break_op (0);
3343  fail_union_operation (op);
3344  return;
3345  }
3346 
3347  if (GNUNET_OK !=
3351  &ee->element_hash,
3353  )
3354  {
3356  "An element has been received more than once!\n");
3357  GNUNET_break (0);
3358  fail_union_operation (op);
3359  return;
3360  }
3361 
3363  "Got element (size %u, hash %s) from peer\n",
3364  (unsigned int) element_size,
3365  GNUNET_h2s (&ee->element_hash));
3366 
3367  GNUNET_STATISTICS_update (_GSS_statistics,
3368  "# received elements",
3369  1,
3370  GNUNET_NO);
3371  GNUNET_STATISTICS_update (_GSS_statistics,
3372  "# exchanged elements",
3373  1,
3374  GNUNET_NO);
3375 
3376  op->received_total++;
3377 
3378  ke = op_get_element (op,
3379  &ee->element_hash);
3380  if (NULL != ke)
3381  {
3382  /* Got repeated element. Should not happen since
3383  * we track demands. */
3384  GNUNET_STATISTICS_update (_GSS_statistics,
3385  "# repeated elements",
3386  1,
3387  GNUNET_NO);
3388  ke->received = GNUNET_YES;
3389  GNUNET_free (ee);
3390  }
3391  else
3392  {
3394  "Registering new element from remote peer\n");
3395  op->received_fresh++;
3396  op_register_element (op, ee, GNUNET_YES);
3397  /* only send results immediately if the client wants it */
3398  send_client_element (op,
3399  &ee->element,
3401  }
3402 
3403  if ((op->received_total > 8) &&
3404  (op->received_fresh < op->received_total / 3))
3405  {
3406  /* The other peer gave us lots of old elements, there's something wrong. */
3407  GNUNET_break_op (0);
3408  fail_union_operation (op);
3409  return;
3410  }
3412  maybe_finish (op);
3413 }
3414 
3415 
3422 static int
3424  const struct GNUNET_SETU_ElementMessage *emsg)
3425 {
3426  struct Operation *op = cls;
3427 
3428  (void) op;
3429 
3430  // FIXME: check that we expect full elements here?
3431  return GNUNET_OK;
3432 }
3433 
3434 
3441 static void
3443  const struct GNUNET_SETU_ElementMessage *emsg)
3444 {
3445  struct Operation *op = cls;
3446  struct ElementEntry *ee;
3447  struct KeyEntry *ke;
3448  uint16_t element_size;
3449 
3453  uint8_t allowed_phases[] = {PHASE_FULL_RECEIVING, PHASE_FULL_SENDING};
3454  if (GNUNET_OK !=
3455  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3456  {
3457  GNUNET_break (0);
3458  fail_union_operation (op);
3459  return;
3460  }
3461 
3462  element_size = ntohs (emsg->header.size)
3463  - sizeof(struct GNUNET_SETU_ElementMessage);
3464 
3465 #if MEASURE_PERFORMANCE
3466  perf_store.element_full.received += 1;
3467  perf_store.element_full.received_var_bytes += element_size;
3468 #endif
3469 
3470  ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
3471  GNUNET_memcpy (&ee[1], &emsg[1], element_size);
3472  ee->element.size = element_size;
3473  ee->element.data = &ee[1];
3474  ee->element.element_type = ntohs (emsg->element_type);
3475  ee->remote = GNUNET_YES;
3477  &ee->element_hash);
3479  "Got element (full diff, size %u, hash %s) from peer\n",
3480  (unsigned int) element_size,
3481  GNUNET_h2s (&ee->element_hash));
3482 
3483  GNUNET_STATISTICS_update (_GSS_statistics,
3484  "# received elements",
3485  1,
3486  GNUNET_NO);
3487  GNUNET_STATISTICS_update (_GSS_statistics,
3488  "# exchanged elements",
3489  1,
3490  GNUNET_NO);
3491 
3492  op->received_total++;
3493  ke = op_get_element (op,
3494  &ee->element_hash);
3495  if (NULL != ke)
3496  {
3497  GNUNET_STATISTICS_update (_GSS_statistics,
3498  "# repeated elements",
3499  1,
3500  GNUNET_NO);
3502  ke->received = GNUNET_YES;
3503  GNUNET_free (ee);
3504  }
3505  else
3506  {
3508  "Registering new element from remote peer\n");
3509  op->received_fresh++;
3510  op_register_element (op, ee, GNUNET_YES);
3511  /* only send results immediately if the client wants it */
3512  send_client_element (op,
3513  &ee->element,
3515  }
3516 
3517 
3518  if ((GNUNET_YES == op->byzantine) &&
3519  (op->received_total > op->remote_element_count) )
3520  {
3521  /* The other peer gave us lots of old elements, there's something wrong. */
3523  "Other peer sent %llu elements while pretending to have %llu elements, failing operation\n",
3524  (unsigned long long) op->received_total,
3525  (unsigned long long) op->remote_element_count);
3526  GNUNET_break_op (0);
3527  fail_union_operation (op);
3528  return;
3529  }
3531 }
3532 
3533 
3541 static int
3543  const struct InquiryMessage *msg)
3544 {
3545  struct Operation *op = cls;
3546  unsigned int num_keys;
3547 
3548  if (op->phase != PHASE_PASSIVE_DECODING)
3549  {
3550  GNUNET_break_op (0);
3551  return GNUNET_SYSERR;
3552  }
3553  num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3554  / sizeof(struct IBF_Key);
3555  if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3556  != num_keys * sizeof(struct IBF_Key))
3557  {
3558  GNUNET_break_op (0);
3559  return GNUNET_SYSERR;
3560  }
3561  return GNUNET_OK;
3562 }
3563 
3564 
3571 static void
3573  const struct InquiryMessage *msg)
3574 {
3575  struct Operation *op = cls;
3576  const struct IBF_Key *ibf_key;
3577  unsigned int num_keys;
3578 
3582  uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
3583  if (GNUNET_OK !=
3584  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3585  {
3586  GNUNET_break (0);
3587  fail_union_operation (op);
3588  return;
3589  }
3590 
3591 #if MEASURE_PERFORMANCE
3592  perf_store.inquery.received += 1;
3593  perf_store.inquery.received_var_bytes += (ntohs (msg->header.size)
3594  - sizeof(struct InquiryMessage));
3595 #endif
3596 
3598  "Received union inquiry\n");
3599  num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3600  / sizeof(struct IBF_Key);
3601  ibf_key = (const struct IBF_Key *) &msg[1];
3602 
3604  struct GNUNET_HashContext *hashed_key_context =
3606  struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*) GNUNET_malloc (
3607  sizeof(struct GNUNET_HashCode));;
3609  GNUNET_CRYPTO_hash_context_read (hashed_key_context,
3610  &ibf_key,
3611  sizeof(struct IBF_Key));
3612  GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
3613  hashed_key);
3615  hashed_key,
3616  &mcfs,
3618  );
3619 
3620  while (0 != num_keys--)
3621  {
3622  struct IBF_Key unsalted_key;
3623  unsalt_key (ibf_key,
3624  ntohl (msg->salt),
3625  &unsalted_key);
3626  send_offers_for_key (op,
3627  unsalted_key);
3628  ibf_key++;
3629  }
3631 }
3632 
3633 
3644 static int
3646  uint32_t key,
3647  void *value)
3648 {
3649  struct Operation *op = cls;
3650  struct KeyEntry *ke = value;
3651  struct GNUNET_MQ_Envelope *ev;
3652  struct GNUNET_SETU_ElementMessage *emsg;
3653  struct ElementEntry *ee = ke->element;
3654 
3655  if (GNUNET_YES == ke->received)
3656  return GNUNET_YES;
3657 #if MEASURE_PERFORMANCE
3658  perf_store.element_full.received += 1;
3659 #endif
3660  ev = GNUNET_MQ_msg_extra (emsg,
3661  ee->element.size,
3663  GNUNET_memcpy (&emsg[1],
3664  ee->element.data,
3665  ee->element.size);
3666  emsg->element_type = htons (ee->element.element_type);
3667  GNUNET_MQ_send (op->mq,
3668  ev);
3669  return GNUNET_YES;
3670 }
3671 
3672 
3679 static int
3681  const struct TransmitFullMessage *mh)
3682 {
3683  return GNUNET_OK;
3684 }
3685 
3686 
3687 static void
3689  const struct TransmitFullMessage *msg)
3690 {
3691  struct Operation *op = cls;
3692 
3696  uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
3697  if (GNUNET_OK !=
3698  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3699  {
3700  GNUNET_break (0);
3701  fail_union_operation (op);
3702  return;
3703  }
3704 
3705  op->remote_element_count = ntohl (msg->remote_set_size);
3706  op->remote_set_diff = ntohl (msg->remote_set_difference);
3707  op->local_set_diff = ntohl (msg->local_set_difference);
3708 
3709 
3710  if (check_byzantine_bounds (op) != GNUNET_OK)
3711  {
3713  "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
3714  "criteria\n");
3715  GNUNET_break_op (0);
3716  fail_union_operation (op);
3717  return;
3718  }
3719 
3720 #if MEASURE_PERFORMANCE
3721  perf_store.request_full.received += 1;
3722 #endif
3723 
3725  "Received request for full set transmission\n");
3726 
3729  op->set->content->elements);
3730  uint64_t avg_element_size = 0;
3731  if (0 < op->local_element_count)
3732  {
3733  op->total_elements_size_local = 0;
3735  &
3737  op);
3738  avg_element_size = op->total_elements_size_local / op->local_element_count;
3739  }
3740 
3741  int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
3742  op->
3744  op->
3746  op->local_set_diff,
3747  op->remote_set_diff,
3748  op->
3750  op->
3752  if (FULL_SYNC_REMOTE_SENDING_FIRST != mode_of_operation)
3753  {
3755  "PROTOCOL VIOLATION: Remote peer choose to request the full set first but correct mode would have been"
3756  " : %d\n", mode_of_operation);
3757  GNUNET_break_op (0);
3758  fail_union_operation (op);
3759  return;
3760  }
3761 
3762  // FIXME: we need to check that our set is larger than the
3763  // byzantine_lower_bound by some threshold
3764  send_full_set (op);
3766 }
3767 
3768 
3775 static void
3777  const struct GNUNET_MessageHeader *mh)
3778 {
3779  struct Operation *op = cls;
3780 
3784  uint8_t allowed_phases[] = {PHASE_FULL_SENDING, PHASE_FULL_RECEIVING};
3785  if (GNUNET_OK !=
3786  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3787  {
3788  GNUNET_break (0);
3789  fail_union_operation (op);
3790  return;
3791  }
3792 
3793 #if MEASURE_PERFORMANCE
3794  perf_store.full_done.received += 1;
3795 #endif
3796 
3797  switch (op->phase)
3798  {
3799  case PHASE_FULL_RECEIVING:
3800  {
3801  struct GNUNET_MQ_Envelope *ev;
3802 
3803  if ((GNUNET_YES == op->byzantine) &&
3804  (op->received_total != op->remote_element_count) )
3805  {
3806  /* The other peer gave not enough elements before sending full done, there's something wrong. */
3808  "Other peer sent only %llu/%llu fresh elements, failing operation\n",
3809  (unsigned long long) op->received_total,
3810  (unsigned long long) op->remote_element_count);
3811  GNUNET_break_op (0);
3812  fail_union_operation (op);
3813  return;
3814  }
3815 
3817  "got FULL DONE, sending elements that other peer is missing\n");
3818 
3819  /* send all the elements that did not come from the remote peer */
3822  op);
3823 #if MEASURE_PERFORMANCE
3824  perf_store.full_done.sent += 1;
3825 #endif
3827  GNUNET_MQ_send (op->mq,
3828  ev);
3829  op->phase = PHASE_FINISHED;
3830  /* we now wait until the other peer sends us the OVER message*/
3831  }
3832  break;
3833 
3834  case PHASE_FULL_SENDING:
3835  {
3837  "got FULL DONE, finishing\n");
3838  /* We sent the full set, and got the response for that. We're done. */
3839  op->phase = PHASE_FINISHED;
3841  send_client_done (op);
3843  return;
3844  }
3845 
3846  default:
3848  "Handle full done phase is %u\n",
3849  (unsigned) op->phase);
3850  GNUNET_break_op (0);
3851  fail_union_operation (op);
3852  return;
3853  }
3855 }
3856 
3857 
3866 static int
3868  const struct GNUNET_MessageHeader *mh)
3869 {
3870  struct Operation *op = cls;
3871  unsigned int num_hashes;
3872 
3873  (void) op;
3874  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3875  / sizeof(struct GNUNET_HashCode);
3876  if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3877  != num_hashes * sizeof(struct GNUNET_HashCode))
3878  {
3879  GNUNET_break_op (0);
3880  return GNUNET_SYSERR;
3881  }
3882  return GNUNET_OK;
3883 }
3884 
3885 
3893 static void
3895  const struct GNUNET_MessageHeader *mh)
3896 {
3897  struct Operation *op = cls;
3898  struct ElementEntry *ee;
3899  struct GNUNET_SETU_ElementMessage *emsg;
3900  const struct GNUNET_HashCode *hash;
3901  unsigned int num_hashes;
3902  struct GNUNET_MQ_Envelope *ev;
3903 
3907  uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3909  if (GNUNET_OK !=
3910  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3911  {
3912  GNUNET_break (0);
3913  fail_union_operation (op);
3914  return;
3915  }
3916 #if MEASURE_PERFORMANCE
3917  perf_store.demand.received += 1;
3918  perf_store.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct
3920 #endif
3921 
3922  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3923  / sizeof(struct GNUNET_HashCode);
3924  for (hash = (const struct GNUNET_HashCode *) &mh[1];
3925  num_hashes > 0;
3926  hash++, num_hashes--)
3927  {
3929  hash);
3930  if (NULL == ee)
3931  {
3932  /* Demand for non-existing element. */
3933  GNUNET_break_op (0);
3934  fail_union_operation (op);
3935  return;
3936  }
3937 
3938  /* Save send demand message for message control */
3939  if (GNUNET_YES !=
3943  &ee->element_hash,
3945  )
3946  {
3948  "Double demand message received found!\n");
3949  GNUNET_break (0);
3950  fail_union_operation (op);
3951  return;
3952  }
3953  ;
3954 
3955  /* Mark element to be expected to received */
3956  if (GNUNET_YES !=
3959  MSG_CFS_SENT,
3960  &ee->element_hash,
3962  )
3963  {
3965  "Double element message sent found!\n");
3966  GNUNET_break (0);
3967  fail_union_operation (op);
3968  return;
3969  }
3970  if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
3971  {
3972  /* Probably confused lazily copied sets. */
3973  GNUNET_break_op (0);
3974  fail_union_operation (op);
3975  return;
3976  }
3977 #if MEASURE_PERFORMANCE
3978  perf_store.element.sent += 1;
3979  perf_store.element.sent_var_bytes += ee->element.size;
3980 #endif
3981  ev = GNUNET_MQ_msg_extra (emsg,
3982  ee->element.size,
3984  GNUNET_memcpy (&emsg[1],
3985  ee->element.data,
3986  ee->element.size);
3987  emsg->reserved = htons (0);
3988  emsg->element_type = htons (ee->element.element_type);
3990  "[OP %p] Sending demanded element (size %u, hash %s) to peer\n",
3991  op,
3992  (unsigned int) ee->element.size,
3993  GNUNET_h2s (&ee->element_hash));
3994  GNUNET_MQ_send (op->mq, ev);
3995  GNUNET_STATISTICS_update (_GSS_statistics,
3996  "# exchanged elements",
3997  1,
3998  GNUNET_NO);
3999  if (op->symmetric)
4000  send_client_element (op,
4001  &ee->element,
4003  }
4005  maybe_finish (op);
4006 }
4007 
4008 
4016 static int
4018  const struct GNUNET_MessageHeader *mh)
4019 {
4020  struct Operation *op = cls;
4021  unsigned int num_hashes;
4022 
4023  /* look up elements and send them */
4024  if ((op->phase != PHASE_PASSIVE_DECODING) &&
4025  (op->phase != PHASE_ACTIVE_DECODING))
4026  {
4027  GNUNET_break_op (0);
4028  return GNUNET_SYSERR;
4029  }
4030  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
4031  / sizeof(struct GNUNET_HashCode);
4032  if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
4033  num_hashes * sizeof(struct GNUNET_HashCode))
4034  {
4035  GNUNET_break_op (0);
4036  return GNUNET_SYSERR;
4037  }
4038  return GNUNET_OK;
4039 }
4040 
4041 
4049 static void
4051  const struct GNUNET_MessageHeader *mh)
4052 {
4053  struct Operation *op = cls;
4054  const struct GNUNET_HashCode *hash;
4055  unsigned int num_hashes;
4059  uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4060  if (GNUNET_OK !=
4061  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4062  {
4063  GNUNET_break (0);
4064  fail_union_operation (op);
4065  return;
4066  }
4067 
4068 #if MEASURE_PERFORMANCE
4069  perf_store.offer.received += 1;
4070  perf_store.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct
4072 #endif
4073 
4074  num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
4075  / sizeof(struct GNUNET_HashCode);
4076  for (hash = (const struct GNUNET_HashCode *) &mh[1];
4077  num_hashes > 0;
4078  hash++, num_hashes--)
4079  {
4080  struct ElementEntry *ee;
4081  struct GNUNET_MessageHeader *demands;
4082  struct GNUNET_MQ_Envelope *ev;
4083 
4085  hash);
4086  if (NULL != ee)
4087  if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
4088  continue;
4089 
4090  if (GNUNET_YES ==
4092  hash))
4093  {
4095  "Skipped sending duplicate demand\n");
4096  continue;
4097  }
4098 
4101  op->demanded_hashes,
4102  hash,
4103  NULL,
4105 
4107  "[OP %p] Requesting element (hash %s)\n",
4108  op, GNUNET_h2s (hash));
4109 
4110 #if MEASURE_PERFORMANCE
4111  perf_store.demand.sent += 1;
4112  perf_store.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode);
4113 #endif
4114  ev = GNUNET_MQ_msg_header_extra (demands,
4115  sizeof(struct GNUNET_HashCode),
4117  /* Save send demand message for message control */
4118  if (GNUNET_YES !=
4121  MSG_CFS_SENT,
4122  hash,
4124  )
4125  {
4126  // GNUNET_free (ev);
4128  "Double demand message sent found!\n");
4129  GNUNET_break (0);
4130  fail_union_operation (op);
4131  return;
4132  }
4133  ;
4134 
4135  /* Mark offer as received received */
4136  if (GNUNET_YES !=
4140  hash,
4141  OFFER_MESSAGE)
4142  )
4143  {
4144  // GNUNET_free (ev);
4146  "Double offer message received found!\n");
4147  GNUNET_break (0);
4148  fail_union_operation (op);
4149  return;
4150  }
4151  ;
4152 
4153  /* Mark element to be expected to received */
4154  if (GNUNET_YES !=
4158  hash,
4160  )
4161  {
4162  // GNUNET_free (ev);
4164  "Element already expected!\n");
4165  GNUNET_break (0);
4166  fail_union_operation (op);
4167  return;
4168  }
4169  ;
4170 
4171 
4172  GNUNET_memcpy (&demands[1],
4173  hash,
4174  sizeof(struct GNUNET_HashCode));
4175  GNUNET_MQ_send (op->mq, ev);
4176  }
4178 }
4179 
4180 
4187 static void
4189  const struct GNUNET_MessageHeader *mh)
4190 {
4191  struct Operation *op = cls;
4192 
4196  uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4197  if (GNUNET_OK !=
4198  check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4199  {
4200  GNUNET_break (0);
4201  fail_union_operation (op);
4202  return;
4203  }
4204 
4206  {
4208  "PROTOCOL VIOLATION: Received done but role change is necessary\n");
4209  GNUNET_break (0);
4210  fail_union_operation (op);
4211  return;
4212  }
4213 
4214 #if MEASURE_PERFORMANCE
4215  perf_store.done.received += 1;
4216 #endif
4217  switch (op->phase)
4218  {
4220  /* We got all requests, but still have to send our elements in response. */
4223  "got DONE (as passive partner), waiting for our demands to be satisfied\n");
4224  /* The active peer is done sending offers
4225  * and inquiries. This means that all
4226  * our responses to that (demands and offers)
4227  * must be in flight (queued or in mesh).
4228  *
4229  * We should notify the active peer once
4230  * all our demands are satisfied, so that the active
4231  * peer can quit if we gave it everything.
4233  maybe_finish (op);
4234  return;
4235  case PHASE_ACTIVE_DECODING:
4237  "got DONE (as active partner), waiting to finish\n");
4238  /* All demands of the other peer are satisfied,
4239  * and we processed all offers, thus we know
4240  * exactly what our demands must be.
4241  *
4242  * We'll close the channel
4243  * to the other peer once our demands are met.
4244  */op->phase = PHASE_FINISH_CLOSING;
4246  maybe_finish (op);
4247  return;
4248  default:
4249  GNUNET_break_op (0);
4250  fail_union_operation (op);
4251  return;
4252  }
4253 }
4254 
4255 
4262 static void
4264  const struct GNUNET_MessageHeader *mh)
4265 {
4266 #if MEASURE_PERFORMANCE
4267  perf_store.over.received += 1;
4268 #endif
4269  send_client_done (cls);
4270 }
4271 
4272 
4281 static struct Operation *
4282 get_incoming (uint32_t id)
4283 {
4284  for (struct Listener *listener = listener_head;
4285  NULL != listener;
4286  listener = listener->next)
4287  {
4288  for (struct Operation *op = listener->op_head;
4289  NULL != op;
4290  op = op->next)
4291  if (op->suggest_id == id)
4292  return op;
4293  }
4294  return NULL;
4295 }
4296 
4297 
4306 static void *
4308  struct GNUNET_SERVICE_Client *c,
4309  struct GNUNET_MQ_Handle *mq)
4310 {
4311  struct ClientState *cs;
4312 
4313  num_clients++;
4314  cs = GNUNET_new (struct ClientState);
4315  cs->client = c;
4316  cs->mq = mq;
4317  return cs;
4318 }
4319 
4320 
4329 static int
4331  const struct GNUNET_HashCode *key,
4332  void *value)
4333 {
4334  struct ElementEntry *ee = value;
4335 
4336  GNUNET_free (ee);
4337  return GNUNET_YES;
4338 }
4339 
4340 
4348 static void
4350  struct GNUNET_SERVICE_Client *client,
4351  void *internal_cls)
4352 {
4353  struct ClientState *cs = internal_cls;
4354  struct Operation *op;
4355  struct Listener *listener;
4356  struct Set *set;
4357 
4359  "Client disconnected, cleaning up\n");
4360  if (NULL != (set = cs->set))
4361  {
4362  struct SetContent *content = set->content;
4363 
4365  "Destroying client's set\n");
4366  /* Destroy pending set operations */
4367  while (NULL != set->ops_head)
4368  _GSS_operation_destroy (set->ops_head);
4369 
4370  /* Destroy operation-specific state */
4371  if (NULL != set->se)
4372  {
4373  strata_estimator_destroy (set->se);
4374  set->se = NULL;
4375  }
4376  /* free set content (or at least decrement RC) */
4377  set->content = NULL;
4378  GNUNET_assert (0 != content->refcount);
4379  content->refcount--;
4380  if (0 == content->refcount)
4381  {
4382  GNUNET_assert (NULL != content->elements);
4385  NULL);
4387  content->elements = NULL;
4388  GNUNET_free (content);
4389  }
4390  GNUNET_free (set);
4391  }
4392 
4393  if (NULL != (listener = cs->listener))
4394  {
4396  "Destroying client's listener\n");
4397  GNUNET_CADET_close_port (listener->open_port);
4398  listener->open_port = NULL;
4399  while (NULL != (op = listener->op_head))
4400  {
4402  "Destroying incoming operation `%u' from peer `%s'\n",
4403  (unsigned int) op->client_request_id,
4404  GNUNET_i2s (&op->peer));
4405  incoming_destroy (op);
4406  }
4407  GNUNET_CONTAINER_DLL_remove (listener_head,
4408  listener_tail,
4409  listener);
4410  GNUNET_free (listener);
4411  }
4412  GNUNET_free (cs);
4413  num_clients--;
4414  if ( (GNUNET_YES == in_shutdown) &&
4415  (0 == num_clients) )
4416  {
4417  if (NULL != cadet)
4418  {
4419  GNUNET_CADET_disconnect (cadet);
4420  cadet = NULL;
4421  }
4422  }
4423 }
4424 
4425 
4434 static int
4436  const struct OperationRequestMessage *msg)
4437 {
4438  struct Operation *op = cls;
4439  struct Listener *listener = op->listener;
4440  const struct GNUNET_MessageHeader *nested_context;
4441 
4442  /* double operation request */
4443  if (0 != op->suggest_id)
4444  {
4445  GNUNET_break_op (0);
4446  return GNUNET_SYSERR;
4447  }
4448  /* This should be equivalent to the previous condition, but can't hurt to check twice */
4449  if (NULL == listener)
4450  {
4451  GNUNET_break (0);
4452  return GNUNET_SYSERR;
4453  }
4454  nested_context = GNUNET_MQ_extract_nested_mh (msg);
4455  if ((NULL != nested_context) &&
4456  (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
4457  {
4458  GNUNET_break_op (0);
4459  return GNUNET_SYSERR;
4460  }
4461  return GNUNET_OK;
4462 }
4463 
4464 
4480 static void
4482  const struct OperationRequestMessage *msg)
4483 {
4484  struct Operation *op = cls;
4485  struct Listener *listener = op->listener;
4486  const struct GNUNET_MessageHeader *nested_context;
4487  struct GNUNET_MQ_Envelope *env;
4488  struct GNUNET_SETU_RequestMessage *cmsg;
4489 
4490  nested_context = GNUNET_MQ_extract_nested_mh (msg);
4491  /* Make a copy of the nested_context (application-specific context
4492  information that is opaque to set) so we can pass it to the
4493  listener later on */
4494  if (NULL != nested_context)
4495  op->context_msg = GNUNET_copy_message (nested_context);
4496  op->remote_element_count = ntohl (msg->element_count);
4497  GNUNET_log (
4499  "Received P2P operation request (port %s) for active listener\n",
4500  GNUNET_h2s (&op->listener->app_id));
4501  GNUNET_assert (0 == op->suggest_id);
4502  if (0 == suggest_id)
4503  suggest_id++;
4504  op->suggest_id = suggest_id++;
4505  GNUNET_assert (NULL != op->timeout_task);
4507  op->timeout_task = NULL;
4508  env = GNUNET_MQ_msg_nested_mh (cmsg,
4510  op->context_msg);
4511  GNUNET_log (
4513  "Suggesting incoming request with accept id %u to listener %p of client %p\n",
4514  op->suggest_id,
4515  listener,
4516  listener->cs);
4517  cmsg->accept_id = htonl (op->suggest_id);
4518  cmsg->peer_id = op->peer;
4519  GNUNET_MQ_send (listener->cs->mq,
4520  env);
4521  /* NOTE: GNUNET_CADET_receive_done() will be called in
4522  #handle_client_accept() */
4523 }
4524 
4525 
4534 static void
4536  const struct GNUNET_SETU_CreateMessage *msg)
4537 {
4538  struct ClientState *cs = cls;
4539  struct Set *set;
4540 
4542  "Client created new set for union operation\n");
4543  if (NULL != cs->set)
4544  {
4545  /* There can only be one set per client */
4546  GNUNET_break (0);
4548  return;
4549  }
4550  set = GNUNET_new (struct Set);
4551  {
4552  struct MultiStrataEstimator *se;
4553 
4556  SE_IBF_HASH_NUM);
4557  if (NULL == se)
4558  {
4560  "Failed to allocate strata estimator\n");
4561  GNUNET_free (set);
4563  return;
4564  }
4565  set->se = se;
4566  }
4567  set->content = GNUNET_new (struct SetContent);
4568  set->content->refcount = 1;
4569  set->content->elements = GNUNET_CONTAINER_multihashmap_create (1,
4570  GNUNET_YES);
4571  set->cs = cs;
4572  cs->set = set;
4574 }
4575 
4576 
4586 static void
4588 {
4589  struct Operation *op = cls;
4590 
4591  op->timeout_task = NULL;
4593  "Remote peer's incoming request timed out\n");
4594  incoming_destroy (op);
4595 }
4596 
4597 
4614 static void *
4615 channel_new_cb (void *cls,
4616  struct GNUNET_CADET_Channel *channel,
4617  const struct GNUNET_PeerIdentity *source)
4618 {
4619  struct Listener *listener = cls;
4620  struct Operation *op;
4621 
4623  "New incoming channel\n");
4624  op = GNUNET_new (struct Operation);
4625  op->listener = listener;
4626  op->peer = *source;
4627  op->channel = channel;
4628  op->mq = GNUNET_CADET_get_mq (op->channel);
4630  UINT32_MAX);
4633  op);
4635  listener->op_tail,
4636  op);
4637  return op;
4638 }
4639 
4640 
4657 static void
4658 channel_end_cb (void *channel_ctx,
4659  const struct GNUNET_CADET_Channel *channel)
4660 {
4661  struct Operation *op = channel_ctx;
4662 
4663  op->channel = NULL;
4665 }
4666 
4667 
4682 static void
4684  const struct GNUNET_CADET_Channel *channel,
4685  int window_size)
4686 {
4687  /* FIXME: not implemented, we could do flow control here... */
4688 }
4689 
4690 
4698 static void
4700  const struct GNUNET_SETU_ListenMessage *msg)
4701 {
4702  struct ClientState *cs = cls;
4703  struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4704  GNUNET_MQ_hd_var_size (incoming_msg,
4706  struct OperationRequestMessage,
4707  NULL),
4708  GNUNET_MQ_hd_var_size (union_p2p_ibf,
4710  struct IBFMessage,
4711  NULL),
4712  GNUNET_MQ_hd_var_size (union_p2p_elements,
4715  NULL),
4716  GNUNET_MQ_hd_var_size (union_p2p_offer,
4718  struct GNUNET_MessageHeader,
4719  NULL),
4720  GNUNET_MQ_hd_var_size (union_p2p_inquiry,
4722  struct InquiryMessage,
4723  NULL),
4724  GNUNET_MQ_hd_var_size (union_p2p_demand,
4726  struct GNUNET_MessageHeader,
4727  NULL),
4728  GNUNET_MQ_hd_fixed_size (union_p2p_done,
4730  struct GNUNET_MessageHeader,
4731  NULL),
4732  GNUNET_MQ_hd_fixed_size (union_p2p_over,
4734  struct GNUNET_MessageHeader,
4735  NULL),
4736  GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
4738  struct GNUNET_MessageHeader,
4739  NULL),
4740  GNUNET_MQ_hd_var_size (union_p2p_request_full,
4742  struct TransmitFullMessage,
4743  NULL),
4744  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4746  struct StrataEstimatorMessage,
4747  NULL),
4748  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4750  struct StrataEstimatorMessage,
4751  NULL),
4752  GNUNET_MQ_hd_var_size (union_p2p_full_element,
4755  NULL),
4756  GNUNET_MQ_hd_var_size (union_p2p_send_full,
4758  struct TransmitFullMessage,
4759  NULL),
4761  };
4762  struct Listener *listener;
4763 
4764  if (NULL != cs->listener)
4765  {
4766  /* max. one active listener per client! */
4767  GNUNET_break (0);
4769  return;
4770  }
4771  listener = GNUNET_new (struct Listener);
4772  listener->cs = cs;
4773  cs->listener = listener;
4774  listener->app_id = msg->app_id;
4775  GNUNET_CONTAINER_DLL_insert (listener_head,
4776  listener_tail,
4777  listener);
4779  "New listener created (port %s)\n",
4780  GNUNET_h2s (&listener->app_id));
4781  listener->open_port = GNUNET_CADET_open_port (cadet,
4782  &msg->app_id,
4783  &channel_new_cb,
4784  listener,
4786  &channel_end_cb,
4787  cadet_handlers);
4789 }
4790 
4791 
4799 static void
4801  const struct GNUNET_SETU_RejectMessage *msg)
4802 {
4803  struct ClientState *cs = cls;
4804  struct Operation *op;
4805 
4806  op = get_incoming (ntohl (msg->accept_reject_id));
4807  if (NULL == op)
4808  {
4809  /* no matching incoming operation for this reject;
4810  could be that the other peer already disconnected... */
4812  "Client rejected unknown operation %u\n",
4813  (unsigned int) ntohl (msg->accept_reject_id));
4815  return;
4816  }
4818  "Peer request (app %s) rejected by client\n",
4819  GNUNET_h2s (&cs->listener->app_id));
4822 }
4823 
4824 
4831 static int
4833  const struct GNUNET_SETU_ElementMessage *msg)
4834 {
4835  /* NOTE: Technically, we should probably check with the
4836  block library whether the element we are given is well-formed */
4837  return GNUNET_OK;
4838 }
4839 
4840 
4847 static void
4849  const struct GNUNET_SETU_ElementMessage *msg)
4850 {
4851  struct ClientState *cs = cls;
4852  struct Set *set;
4853  struct GNUNET_SETU_Element el;
4854  struct ElementEntry *ee;
4855  struct GNUNET_HashCode hash;
4856 
4857  if (NULL == (set = cs->set))
4858  {
4859  /* client without a set requested an operation */
4860  GNUNET_break (0);
4862  return;
4863  }
4865  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
4866  el.size = ntohs (msg->header.size) - sizeof(*msg);
4867  el.data = &msg[1];
4868  el.element_type = ntohs (msg->element_type);
4870  &hash);
4871  ee = GNUNET_CONTAINER_multihashmap_get (set->content->elements,
4872  &hash);
4873  if (NULL == ee)
4874  {
4876  "Client inserts element %s of size %u\n",
4877  GNUNET_h2s (&hash),
4878  el.size);
4879  ee = GNUNET_malloc (el.size + sizeof(*ee));
4880  ee->element.size = el.size;
4881  GNUNET_memcpy (&ee[1], el.data, el.size);
4882  ee->element.data = &ee[1];
4884  ee->remote = GNUNET_NO;
4885  ee->generation = set->current_generation;
4886  ee->element_hash = hash;
4889  set->content->elements,
4890  &ee->element_hash,
4891  ee,
4893  }
4894  else
4895  {
4897  "Client inserted element %s of size %u twice (ignored)\n",
4898  GNUNET_h2s (&hash),
4899  el.size);
4900  /* same element inserted twice */
4901  return;
4902  }
4903  strata_estimator_insert (set->se,
4904  get_ibf_key (&ee->element_hash));
4905 }
4906 
4907 
4914 static void
4916 {
4917  set->content->latest_generation++;
4918  set->current_generation++;
4919 }
4920 
4921 
4931 static int
4933  const struct GNUNET_SETU_EvaluateMessage *msg)
4934 {
4935  /* FIXME: suboptimal, even if the context below could be NULL,
4936  there are malformed messages this does not check for... */
4937  return GNUNET_OK;
4938 }
4939 
4940 
4949 static void
4951  const struct GNUNET_SETU_EvaluateMessage *msg)
4952 {
4953  struct ClientState *cs = cls;
4954  struct Operation *op = GNUNET_new (struct Operation);
4955 
4956  const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4957  GNUNET_MQ_hd_var_size (incoming_msg,
4959  struct OperationRequestMessage,
4960  op),
4961  GNUNET_MQ_hd_var_size (union_p2p_ibf,
4963  struct IBFMessage,
4964  op),
4965  GNUNET_MQ_hd_var_size (union_p2p_elements,
4968  op),
4969  GNUNET_MQ_hd_var_size (union_p2p_offer,
4971  struct GNUNET_MessageHeader,
4972  op),
4973  GNUNET_MQ_hd_var_size (union_p2p_inquiry,
4975  struct InquiryMessage,
4976  op),
4977  GNUNET_MQ_hd_var_size (union_p2p_demand,
4979  struct GNUNET_MessageHeader,
4980  op),
4981  GNUNET_MQ_hd_fixed_size (union_p2p_done,
4983  struct GNUNET_MessageHeader,
4984  op),
4985  GNUNET_MQ_hd_fixed_size (union_p2p_over,
4987  struct GNUNET_MessageHeader,
4988  op),
4989  GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
4991  struct GNUNET_MessageHeader,
4992  op),
4993  GNUNET_MQ_hd_var_size (union_p2p_request_full,
4995  struct TransmitFullMessage,
4996  op),
4997  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4999  struct StrataEstimatorMessage,
5000  op),
5001  GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
5003  struct StrataEstimatorMessage,
5004  op),
5005  GNUNET_MQ_hd_var_size (union_p2p_full_element,
5008  op),
5009  GNUNET_MQ_hd_var_size (union_p2p_send_full,
5011  struct TransmitFullMessage,
5012  NULL),
5014  };
5015  struct Set *set;
5016  const struct GNUNET_MessageHeader *context;
5017 
5018  if (NULL == (set = cs->set))
5019  {
5020  GNUNET_break (0);
5021  GNUNET_free (op);
5023  return;
5024  }
5026  UINT32_MAX);
5027  op->peer = msg->target_peer;
5028  op->client_request_id = ntohl (msg->request_id);
5029  op->byzantine = msg->byzantine;
5030  op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
5031  op->force_full = msg->force_full;
5032  op->force_delta = msg->force_delta;
5033  op->symmetric = msg->symmetric;
5038  op->active_passive_switch_required = false;
5039  context = GNUNET_MQ_extract_nested_mh (msg);
5040 
5041  /* create hashmap for message control */
5043  GNUNET_NO);
5045 
5046 #if MEASURE_PERFORMANCE
5047  /* load config */
5048  load_config (op);
5049 #endif
5050 
5051  /* Advance generation values, so that
5052  mutations won't interfer with the running operation. */
5053  op->set = set;
5054  op->generation_created = set->current_generation;
5055  advance_generation (set);
5056  GNUNET_CONTAINER_DLL_insert (set->ops_head,
5057  set->ops_tail,
5058  op);
5060  "Creating new CADET channel to port %s for set union\n",
5061  GNUNET_h2s (&msg->app_id));
5062  op->channel = GNUNET_CADET_channel_create (cadet,
5063  op,
5064  &msg->target_peer,
5065  &msg->app_id,
5067  &channel_end_cb,
5068  cadet_handlers);
5069  op->mq = GNUNET_CADET_get_mq (op->channel);
5070  {
5071  struct GNUNET_MQ_Envelope *ev;
5072  struct OperationRequestMessage *msg;
5073 
5074 #if MEASURE_PERFORMANCE
5075  perf_store.operation_request.sent += 1;
5076 #endif
5077  ev = GNUNET_MQ_msg_nested_mh (msg,
5079  context);
5080  if (NULL == ev)
5081  {
5082  /* the context message is too large */
5083  GNUNET_break (0);
5085  return;
5086  }
5088  GNUNET_NO);
5089  /* copy the current generation's strata estimator for this operation */
5090  op->se = strata_estimator_dup (op->set->se);
5091  /* we started the operation, thus we have to send the operation request */
5092  op->phase = PHASE_EXPECT_SE;
5093 
5094  op->salt_receive = (op->peer_site + 1) % 2;
5095  op->salt_send = op->peer_site; // FIXME?????
5096 
5097 
5099  "Initiating union operation evaluation\n");
5100  GNUNET_STATISTICS_update (_GSS_statistics,
5101  "# of total union operations",
5102  1,
5103  GNUNET_NO);
5104  GNUNET_STATISTICS_update (_GSS_statistics,
5105  "# of initiated union operations",
5106  1,
5107  GNUNET_NO);
5108  GNUNET_MQ_send (op->mq,
5109  ev);
5110  if (NULL != context)
5112  "sent op request with context message\n");
5113  else
5115  "sent op request without context message\n");
5118  op->key_to_element);
5119 
5120  }
5122 }
5123 
5124 
5131 static void
5133  const struct GNUNET_SETU_CancelMessage *msg)
5134 {
5135  struct ClientState *cs = cls;
5136  struct Set *set;
5137  struct Operation *op;
5138  int found;
5139 
5140  if (NULL == (set = cs->set))
5141  {
5142  /* client without a set requested an operation */
5143  GNUNET_break (0);
5145  return;
5146  }
5147  found = GNUNET_NO;
5148  for (op = set->ops_head; NULL != op; op = op->next)
5149  {
5150  if (op->client_request_id == ntohl (msg->request_id))
5151  {
5152  found = GNUNET_YES;
5153  break;
5154  }
5155  }
5156  if (GNUNET_NO == found)
5157  {
5158  /* It may happen that the operation was already destroyed due to
5159  * the other peer disconnecting. The client may not know about this
5160  * yet and try to cancel the (just barely non-existent) operation.
5161  * So this is not a hard error.
5162  *///
5164  "Client canceled non-existent op %u\n",
5165  (uint32_t) ntohl (msg->request_id));
5166  }
5167  else
5168  {
5170  "Client requested cancel for op %u\n",
5171  (uint32_t) ntohl (msg->request_id));
5173  }
5175 }
5176 
5177 
5186 static void
5188  const struct GNUNET_SETU_AcceptMessage *msg)
5189 {
5190  struct ClientState *cs = cls;
5191  struct Set *set;
5192  struct Operation *op;
5193  struct GNUNET_SETU_ResultMessage *result_message;
5194  struct GNUNET_MQ_Envelope *ev;
5195  struct Listener *listener;
5196 
5197  if (NULL == (set = cs->set))
5198  {
5199  /* client without a set requested to accept */
5200  GNUNET_break (0);
5202  return;
5203  }
5204  op = get_incoming (ntohl (msg->accept_reject_id));
5205  if (NULL == op)
5206  {
5207  /* It is not an error if the set op does not exist -- it may
5208  * have been destroyed when the partner peer disconnected. */
5209  GNUNET_log (
5211  "Client %p accepted request %u of listener %p that is no longer active\n",
5212  cs,
5213  ntohl (msg->accept_reject_id),
5214  cs->listener);
5215  ev = GNUNET_MQ_msg (result_message,
5217  result_message->request_id = msg->request_id;
5218  result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
5219  GNUNET_MQ_send (set->cs->mq, ev);
5221  return;
5222  }
5224  "Client accepting request %u\n",
5225  (uint32_t) ntohl (msg->accept_reject_id));
5226  listener = op->listener;
5227  op->listener = NULL;
5229  listener->op_tail,
5230  op);
5231  op->set = set;
5232  GNUNET_CONTAINER_DLL_insert (set->ops_head,
5233  set->ops_tail,
5234  op);
5235  op->client_request_id = ntohl (msg->request_id);
5236  op->byzantine = msg->byzantine;
5237  op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
5238  op->force_full = msg->force_full;
5239  op->force_delta = msg->force_delta;
5240  op->symmetric = msg->symmetric;
5245  op->active_passive_switch_required = false;
5246  /* create hashmap for message control */
5248  GNUNET_NO);
5250 
5251 #if MEASURE_PERFORMANCE
5252  /* load config */
5253  load_config (op);
5254 #endif
5255 
5256  /* Advance generation values, so that future mutations do not
5257  interfer with the running operation. */
5258  op->generation_created = set->current_generation;
5259  advance_generation (set);
5260  GNUNET_assert (NULL == op->se);
5261 
5263  "accepting set union operation\n");
5264  GNUNET_STATISTICS_update (_GSS_statistics,
5265  "# of accepted union operations",
5266  1,
5267  GNUNET_NO);
5268  GNUNET_STATISTICS_update (_GSS_statistics,
5269  "# of total union operations",
5270  1,
5271  GNUNET_NO);
5272  {
5273  struct MultiStrataEstimator *se;
5274  struct GNUNET_MQ_Envelope *ev;
5275  struct StrataEstimatorMessage *strata_msg;
5276  char *buf;
5277  size_t len;
5278  uint16_t type;
5279 
5280  op->se = strata_estimator_dup (op->set->se);
5282  GNUNET_NO);
5283  op->salt_receive = (op->peer_site + 1) % 2;
5284  op->salt_send = op->peer_site; // FIXME?????
5287  op->key_to_element);
5288 
5289  /* kick off the operation */
5290  se = op->se;
5291 
5292  uint8_t se_count = 1;
5293  if (op->initial_size > 0)
5294  {
5295  op->total_elements_size_local = 0;
5297  &
5299  op);
5300  se_count = determine_strata_count (
5302  op->initial_size);
5303  }
5305  * ((SE_IBFS_TOTAL_SIZE / 8) * se_count));
5306  len = strata_estimator_write (se,
5308  se_count,
5309  buf);
5310 #if MEASURE_PERFORMANCE
5311  perf_store.se.sent += 1;
5312  perf_store.se.sent_var_bytes += len;
5313 #endif
5314 
5315  if (len < se->stratas[0]->strata_count * IBF_BUCKET_SIZE
5318  else
5320  ev = GNUNET_MQ_msg_extra (strata_msg,
5321  len,
5322  type);
5323  GNUNET_memcpy (&strata_msg[1],
5324  buf,
5325  len);
5326  GNUNET_free (buf);
5327  strata_msg->set_size
5329  op->set->content->elements));
5330  strata_msg->se_count = se_count;
5331  GNUNET_MQ_send (op->mq,
5332  ev);
5333  op->phase = PHASE_EXPECT_IBF;
5334  }
5335  /* Now allow CADET to continue, as we did not do this in
5336  #handle_incoming_msg (as we wanted to first see if the
5337  local client would accept the request). */
5340 }
5341 
5342 
5348 static void
5349 shutdown_task (void *cls)
5350 {
5351  /* Delay actual shutdown to allow service to disconnect clients */
5353  if (0 == num_clients)
5354  {
5355  if (NULL != cadet)
5356  {
5357  GNUNET_CADET_disconnect (cadet);
5358  cadet = NULL;
5359  }
5360  }
5361  GNUNET_STATISTICS_destroy (_GSS_statistics,
5362  GNUNET_YES);
5364  "handled shutdown request\n");
5365 #if MEASURE_PERFORMANCE
5366  calculate_perf_store ();
5367 #endif
5368 }
5369 
5370 
5379 static void
5380 run (void *cls,
5381  const struct GNUNET_CONFIGURATION_Handle *cfg,
5383 {
5384  /* FIXME: need to modify SERVICE (!) API to allow
5385  us to run a shutdown task *after* clients were
5386  forcefully disconnected! */
5388  NULL);
5389  _GSS_statistics = GNUNET_STATISTICS_create ("setu",
5390  cfg);
5391  cadet = GNUNET_CADET_connect (cfg);
5392  if (NULL == cadet)
5393  {
5395  _ ("Could not connect to CADET service\n"));
5397  return;
5398  }
5399 }
5400 
5401 
5406  "set",
5408  &run,
5411  NULL,
5412  GNUNET_MQ_hd_fixed_size (client_accept,
5415  NULL),
5416  GNUNET_MQ_hd_var_size (client_set_add,
5419  NULL),
5420  GNUNET_MQ_hd_fixed_size (client_create_set,
5423  NULL),
5424  GNUNET_MQ_hd_var_size (client_evaluate,
5427  NULL),
5428  GNUNET_MQ_hd_fixed_size (client_listen,
5431  NULL),
5432  GNUNET_MQ_hd_fixed_size (client_reject,
5435  NULL),
5436  GNUNET_MQ_hd_fixed_size (client_cancel,
5439  NULL),
5441 
5442 
5443 /* end of gnunet-service-setu.c */
struct Operation * prev
Kept in a DLL of the listener, if listener is non-NULL.
Context for op_get_element_iterator.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
uint32_t offset
Offset of the strata in the rest of the message.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST
Request a set union operation from a remote peer.
uint8_t mode_of_operation
Mode of operation that was chosen by the algorithm.
static void handle_union_p2p_request_full(void *cls, const struct TransmitFullMessage *msg)
struct Set * prev
Sets are held in a doubly linked list.
uint32_t received_fresh
Number of elements we received from the other peer that were not in the local set yet...
static void client_disconnect_cb(void *cls, struct GNUNET_SERVICE_Client *client, void *internal_cls)
Clean up after a client has disconnected.
Message sent by the client to the service to start listening for incoming requests to perform a certa...
Definition: setu.h:55
void GNUNET_CADET_disconnect(struct GNUNET_CADET_Handle *handle)
Disconnect from the cadet service.
Definition: cadet_api.c:775
static int check_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check a full element message from a remote peer.
uint8_t ibf_bucket_number_factor
Set difference is multiplied with this factor to gennerate large enough IBF.
struct Set * set
Set, if associated with the client, otherwise NULL.
#define GNUNET_MESSAGE_TYPE_SETU_REQUEST
Notify the client of an incoming request from a remote peer.
#define SECURITY_LEVEL
Security level used for byzantine checks (2^80)
static struct GNUNET_SERVICE_Handle * service
Handle to our service instance.
uint64_t set_size
Size of the local set.
After sending the full set, wait for responses with the elements that the local peer is missing...
#define DIFFERENTIAL_RTT_MEAN
AVG RTT for differential sync when k=2 and Factor = 2 Based on the bsc thesis of Elias Summermatter (...
Struct to tracked messages in message control flow.
static int check_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Check a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
State we keep per client.
static unsigned int phase
Processing stage that we are in.
Definition: gnunet-arm.c:114
static unsigned int get_next_ibf_size(float ibf_bucket_number_factor, unsigned int decoded_elements, unsigned int last_ibf_size)
uint64_t local_set_diff
Estimated or committed set difference at the start.
static int prepare_ibf_iterator(void *cls, uint32_t key, void *value)
Insert a key into an ibf.
static void maybe_finish(struct Operation *op)
Tests if the operation is finished, and if so notify.
struct GNUNET_MessageHeader * context_msg
Context message, may be NULL.
static const struct GNUNET_CONFIGURATION_Handle * cfg
Configuration we are using.
Definition: gnunet-abd.c:36
static void send_full_set(struct Operation *op)
Switch to full set transmission for op.
unsigned int GNUNET_CONTAINER_multihashmap_size(const struct GNUNET_CONTAINER_MultiHashMap *map)
Get the number of key-value pairs in the map.
Track that a message has been sent.
#define SE_IBF_HASH_NUM
The hash num parameter for the difference digests and strata estimators.
uint64_t local_element_count
Local peer element count.
enum IntersectionOperationPhase phase
Current state of the operation.
static unsigned int element_size
int received
Did we receive this element? Even if element->is_foreign is false, we might have received the element...
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
static int byzantine
uint32_t suggest_id
Unique request id for the request from a remote peer, sent to the client, which will accept or reject...
If a value with the given key exists, replace it.
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_load(struct GNUNET_CONFIGURATION_Handle *cfg, const char *filename)
Load configuration.
struct MultiStrataEstimator * se
Copy of the set&#39;s strata estimator at the time of creation of this operation.
struct Operation * op
Operation for which the elements should be sent.
static int check_union_p2p_request_full(void *cls, const struct TransmitFullMessage *mh)
Handle a request for full set transmission.
uint32_t element_count
For Intersection: my element count.
GNUNET_SERVICE_MAIN("set", GNUNET_SERVICE_OPTION_NONE, &run, &client_connect_cb, &client_disconnect_cb, NULL, GNUNET_MQ_hd_fixed_size(client_accept, GNUNET_MESSAGE_TYPE_SETU_ACCEPT, struct GNUNET_SETU_AcceptMessage, NULL), GNUNET_MQ_hd_var_size(client_set_add, GNUNET_MESSAGE_TYPE_SETU_ADD, struct GNUNET_SETU_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size(client_create_set, GNUNET_MESSAGE_TYPE_SETU_CREATE, struct GNUNET_SETU_CreateMessage, NULL), GNUNET_MQ_hd_var_size(client_evaluate, GNUNET_MESSAGE_TYPE_SETU_EVALUATE, struct GNUNET_SETU_EvaluateMessage, NULL), GNUNET_MQ_hd_fixed_size(client_listen, GNUNET_MESSAGE_TYPE_SETU_LISTEN, struct GNUNET_SETU_ListenMessage, NULL), GNUNET_MQ_hd_fixed_size(client_reject, GNUNET_MESSAGE_TYPE_SETU_REJECT, struct GNUNET_SETU_RejectMessage, NULL), GNUNET_MQ_hd_fixed_size(client_cancel, GNUNET_MESSAGE_TYPE_SETU_CANCEL, struct GNUNET_SETU_CancelMessage, NULL), GNUNET_MQ_handler_end())
Define "main" method using service macro.
estimate_best_mode_of_operation (uint64_t avg_element_size, uint64_t local_set_size, uint64_t remote_set_size, uint64_t est_set_diff_remote, uint64_t est_set_diff_local,)
uint8_t differential_sync_iterations
is the count of already passed differential sync iterations
struct GNUNET_CONTAINER_MultiHashMap * message_control_flow
Hashmap to keep track of the send/received messages.
Opaque handle to the service.
Definition: cadet_api.c:38
Handle to a service.
Definition: service.c:116
#define GNUNET_MQ_msg_nested_mh(mvar, type, mh)
Allocate a GNUNET_MQ_Envelope, and append a payload message after the given message struct...
int byzantine_lower_bound
Lower bound for the set size, used only when byzantine mode is enabled.
struct GNUNET_HashCode app_id
Application id.
Definition: setu.h:223
struct GNUNET_MQ_Handle * mq
MQ to talk to client.
const void * data
Actual data of the element.
uint8_t ibf_get_max_counter(struct InvertibleBloomFilter *ibf)
Returns the minimal bytes needed to store the counter of the IBF.
Definition: ibf.c:298
uint8_t symmetric
Also return set elements we are sending to the remote peer.
Definition: setu.h:246
uint16_t reserved
For alignment, always zero.
Definition: setu.h:340
uint32_t accept_reject_id
ID of the incoming request we want to reject.
Definition: setu.h:167
static void send_client_element(struct Operation *op, const struct GNUNET_SETU_Element *element, enum GNUNET_SETU_Status status)
Send a result message to the client indicating that there is a new element.
uint64_t GNUNET_CRYPTO_random_u64(enum GNUNET_CRYPTO_Quality mode, uint64_t max)
Random on unsigned 64-bit values.
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run on shutdown, that is when a CTRL-C signal is received, or when GNUNET_SCHEDULER_shutdown() is being invoked.
Definition: scheduler.c:1331
struct GNUNET_HashCode element_hash
Hash of the element.
static void check_max_differential_rounds(struct Operation *op)
Limit active passive switches in differential sync to configured security level.
unsigned int refcount
Number of references to the content.
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
struct GNUNET_CONTAINER_MultiHashMap * inquiries_sent
Hashmap to keep track of the send/received inquiries (ibf keys)
int remote_decoded_count
If an IBF is decoded this count stores how many elements are on the remote site.
Definition: ibf.h:108
int GNUNET_CONTAINER_multihashmap32_iterate(struct GNUNET_CONTAINER_MultiHashMap32 *map, GNUNET_CONTAINER_MulitHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map.
struct StrataEstimator * strata_estimator_create(unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum)
Create a new strata estimator with the given parameters.
Message containing buckets of an invertible bloom filter.
struct Listener * next
Listeners are held in a doubly linked list.
static void handle_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
#define GNUNET_MQ_extract_nested_mh(var)
Return a pointer to the message at the end of the given message.
struct InvertibleBloomFilter * remote_ibf
The IBF we currently receive.
uint32_t GNUNET_CRYPTO_random_u32(enum GNUNET_CRYPTO_Quality mode, uint32_t i)
Produce a random value.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
static struct GNUNET_CADET_Handle * mh
Cadet handle.
Definition: gnunet-cadet.c:92
uint64_t ibf_bucket_number_factor
The factor determines the number of buckets an IBF has which is multiplied by the estimated setsize d...
Definition: setu.h:270
The other peer is decoding the IBF we just sent.
uint64_t byzantine_upper_bond
Upper bound for the set size, used only when byzantine mode is enabled.
Definition: setu.h:258
static struct Listener * listener_tail
Listeners are held in a doubly linked list.
uint32_t request_id
ID of the request we want to cancel.
Definition: setu.h:359
Invertible bloom filter (IBF).
Definition: ibf.h:82
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE(C)
static int send_ibf(struct Operation *op, uint32_t ibf_size)
Send an ibf of appropriate size.
struct Operation * op_head
Head of DLL of operations this listener is responsible for.
uint64_t byzantine_lower_bound
Lower bound for the set size, used only when byzantine mode is enabled.
static uint8_t estimate_best_mode_of_operation(uint64_t avg_element_size, uint64_t local_set_size, uint64_t remote_set_size, uint64_t est_set_diff_remote, uint64_t est_set_diff_local, uint64_t bandwith_latency_tradeoff, uint64_t ibf_bucket_number_factor)
Function that chooses the optimal mode of operation depending on operation parameters.
In the ultimate phase, we wait until our demands are satisfied and then quit (sending another DONE me...
UnionOperationPhase
Current phase we are in for a union operation.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SE
Strata estimator.
static int init_key_to_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for initializing the key-to-element mapping of a union operation.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
uint64_t total_elements_size_local
Total size of local set.
static int ret
Return value of the commandline.
Definition: gnunet-abd.c:81
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:67
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_get_value_number(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, unsigned long long *number)
Get a configuration value that should be a number.
uint32_t accept_reject_id
ID of the incoming request we want to accept.
Definition: setu.h:88
uint32_t ibf_size
Size of the whole ibf (number of buckets)
static int check_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Check an IBF message from a remote peer.
static void unsalt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
Reverse modification done in the salt_key function.
uint8_t ibf_number_buckets_per_element
Number of Element per bucket in IBF.
struct GNUNET_CONTAINER_MultiHashMap * elements
Maps struct GNUNET_HashCode * to struct ElementEntry *.
static void handle_client_cancel(void *cls, const struct GNUNET_SETU_CancelMessage *msg)
Handle a request from the client to cancel a running set operation.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL
Signals other peer that all elements are sent.
struct KeyEntry * k
FIXME.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER
Tell the other peer which hashes match a given IBF key.
uint64_t byzantine_upper_bond
Upper bound for the set size, used only when byzantine mode is enabled.
Definition: setu.h:130
uint64_t initial_size
Initial size of our set, just before the operation started.
Opaque handle to a channel.
Definition: cadet.h:116
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
struct GNUNET_HashContext * GNUNET_CRYPTO_hash_context_start(void)
Start incremental hashing operation.
Definition: crypto_hash.c:321
static struct GNUNET_CADET_Handle * cadet
Handle to the cadet service, used to listen for and connect to remote peers.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
Track that message has been received.
uint64_t key_val
Definition: ibf.h:47
struct GNUNET_CONFIGURATION_Handle * GNUNET_CONFIGURATION_create(void)
Create a new configuration object.
int GNUNET_CONTAINER_multihashmap32_get_multiple(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, GNUNET_CONTAINER_MulitHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map that match a particular key.
Message which signals to other peer that we are sending full set.
A listener is inhabited by a client, and waits for evaluation requests from remote peers...
#define GNUNET_MESSAGE_TYPE_SETU_P2P_DONE
Set operation is done.
uint32_t request_id
id the result belongs to
Definition: setu.h:304
void ibf_write_slice(const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf)
Write buckets from an ibf to a buffer.
Definition: ibf.c:290
uint32_t salt_receive
Salt for the IBF we&#39;ve received and that we&#39;re currently decoding.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
int GNUNET_CONTAINER_multihashmap_contains(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Check if the map contains any value under the given key (including values that are NULL)...
static int update_message_control_flow(struct GNUNET_CONTAINER_MultiHashMap *hash_map, enum MESSAGE_CONTROL_FLOW_STATE new_mcfs, const struct GNUNET_HashCode *hash_code, enum MESSAGE_TYPE mt)
Function to update, track and validate message received in differential sync.
static void handle_union_p2p_over(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a over message from a remote peer.
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
uint32_t request_id
Request ID to identify responses.
Definition: setu.h:93
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
uint64_t rtt_bandwidth_tradeoff
User defined Bandwidth Round Trips Tradeoff.
uint8_t byzantine
GNUNET_YES to fail operations where Byzantine faults are suspected
Definition: setu.h:111
void GNUNET_SCHEDULER_shutdown(void)
Request the shutdown of a scheduler.
Definition: scheduler.c:531
Handle for the service.
static void _GSS_operation_destroy2(struct Operation *op)
This function probably should not exist and be replaced by inlining more specific logic in the variou...
static unsigned int force_full
static void * channel_new_cb(void *cls, struct GNUNET_CADET_Channel *channel, const struct GNUNET_PeerIdentity *source)
Method called whenever another peer has added us to a channel the other peer initiated.
int GNUNET_CONTAINER_multihashmap32_put(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
uint16_t element_type
Type of the element attached to the message, if any.
Definition: setu.h:315
Message sent by the client to the service to ask starting a new set to perform operations with...
Definition: setu.h:40
Internal representation of the hash map.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur...
#define SE_IBFS_TOTAL_SIZE
Primes for all 4 different strata estimators 61,67,71,73,79,83,89,97 348 Based on the bsc thesis of E...
uint16_t ibf_counter_bit_length
The bit length of the counter.
#define INCOMING_CHANNEL_TIMEOUT
How long do we hold on to an incoming channel if there is no local listener before giving up...
unsigned int generation_created
Generation in which the operation handle was created.
static unsigned int num_clients
Number of active clients.
struct GNUNET_SCHEDULER_Task * timeout_task
Timeout task, if the incoming peer has not been accepted after the timeout, it will be disconnected...
static int send_full_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Send a set element.
uint64_t remote_set_diff
Estimated or committed set difference at the start.
static struct GNUNET_DNSSTUB_Context * ctx
Context for DNS resolution.
static pa_context * context
Pulseaudio context.
const void * data
Actual data of the element.
struct GNUNET_CADET_Handle * GNUNET_CADET_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the MQ-based cadet service.
Definition: cadet_api.c:910
The protocol is almost finished, but we still have to flush our message queue and/or expect some elem...
struct StrataEstimator * strata_estimator_dup(struct StrataEstimator *se)
Make a copy of a strata estimator.
bool active_passive_switch_required
Boolean to enforce an active passive switch.
static void handle_union_p2p_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a done message from a remote peer.
struct Operation * next
Kept in a DLL of the listener, if listener is non-NULL.
uint32_t number
uint64_t ibf_bucket_number_factor
The factor determines the number of buckets an IBF has which is multiplied by the estimated setsize d...
Definition: setu.h:142
static void _GSS_operation_destroy(struct Operation *op)
Destroy the given operation.
uint32_t remote_set_difference
Remote set difference calculated with strata estimator.
uint8_t force_full
Always send full sets, even if delta operations would be more efficient.
Definition: setu.h:235
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
#define _(String)
GNU gettext support macro.
Definition: platform.h:178
The key entry is used to associate an ibf key with an element.
Handle to a client that is connected to a service.
Definition: service.c:250
int GNUNET_snprintf(char *buf, size_t size, const char *format,...) __attribute__((format(printf
Like snprintf, just aborts if the buffer is of insufficient size.
static int check_byzantine_bounds(struct Operation *op)
Check if all given byzantine parameters are in given boundaries.
uint64_t ibf_number_of_buckets_per_element
This setting determines to how many IBF buckets an single elements is mapped to.
Definition: setu.h:148
static void send_client_done(void *cls)
Signal to the client that the operation has finished and destroy the operation.
struct GNUNET_PeerIdentity peer
The identity of the requesting peer.
uint64_t bandwidth_latency_tradeoff
Bandwidth latency tradeoff determines how much bytes a single RTT is worth, which is a performance se...
Definition: setu.h:264
static struct IBF_Key get_ibf_key(const struct GNUNET_HashCode *src)
Derive the IBF key from a hash code and a salt.
uint8_t symmetric
GNUNET_YES to also send back set elements we are sending to the remote peer.
Definition: setu.h:117
struct InvertibleBloomFilter * ibf_dup(const struct InvertibleBloomFilter *ibf)
Create a copy of an IBF, the copy has to be destroyed properly.
Definition: ibf.c:379
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEC
Compressed strata estimator.
Mode just synchronizes the difference between sets.
uint16_t element_type
Type of the element to add or remove.
Definition: setu.h:335
uint64_t bandwidth_latency_tradeoff
Bandwidth latency tradeoff determines how much bytes a single RTT is worth, which is a performance se...
Definition: setu.h:136
uint32_t size
How many cells does this IBF have?
Definition: ibf.h:87
static void handle_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Handle an IBF message from a remote peer.
static int check_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
Message sent by client to service to initiate a set operation as a client (not as listener)...
Definition: setu.h:202
static unsigned int ibf_size
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct...
Definition: gnunet_mq_lib.h:52
static void fail_union_operation(struct Operation *op)
Inform the client that the union operation has failed, and proceed to destroy the evaluate operation...
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1269
The other peer refused to do the operation with us, or something went wrong.
static struct KeyEntry * op_get_element(struct Operation *op, const struct GNUNET_HashCode *element_hash)
Determine whether the given element is already in the operation&#39;s element set.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
static int check_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
GNUNET_SETU_Status
Status for the result callback.
uint32_t remote_set_size
Total remote set size.
struct GNUNET_CADET_Port * GNUNET_CADET_open_port(struct GNUNET_CADET_Handle *h, const struct GNUNET_HashCode *port, GNUNET_CADET_ConnectEventHandler connects, void *connects_cls, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Open a port to receive incoming MQ-based channels.
Definition: cadet_api.c:970
#define GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL
Demand the whole element from the other peer, given only the hash code.
, &#39; bother checking if a value already exists (faster than GNUNET_CONTAINER_MULTIHASHMAPOPTION_...
static unsigned int get_size_from_difference(unsigned int diff, int number_buckets_per_element, float ibf_bucket_number_factor)
Compute the necessary order of an ibf from the size of the symmetric set difference.
enum MESSAGE_CONTROL_FLOW_STATE element
Track the message control state of the element message.
static int destroy_elements_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator over hash map entries to free element entries.
static char * value
Value of the record to add/remove.
#define MAX_BUCKETS_PER_MESSAGE
Number of buckets that can be transmitted in one message.
struct GNUNET_CONTAINER_MultiHashMap * demanded_hashes
Hashes for elements that we have demanded from the other peer.
Information about an element element in the set.
#define GNUNET_MESSAGE_TYPE_SETU_ACCEPT
Accept an incoming set request.
#define GNUNET_MESSAGE_TYPE_SETU_RESULT
Handle result message from operation.
#define GNUNET_MESSAGE_TYPE_SETU_CREATE
Create a new local set.
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
static void advance_generation(struct Set *set)
Advance the current generation of a set, adding exclusion ranges if necessary.
Success, all elements have been sent (and received).
static void handle_union_p2p_full_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a "full done" message.
void strata_estimator_destroy(struct StrataEstimator *se)
Destroy a strata estimator, free all of its resources.
int remote
GNUNET_YES if the element is a remote element, and does not belong to the operation&#39;s set...
uint16_t element_type
Application-specific element type.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
int ibf_decode(struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *ret_id)
Decode and remove an element from the IBF, if possible.
Definition: ibf.c:228
struct IBF_Key ibf_key
IBF key for the entry, derived from the current salt.
Element should be added to the result set of the local peer, i.e.
We sent the strata estimator, and expect an IBF.
A set that supports a specific operation with other peers.
uint32_t accept_id
ID of the to identify the request when accepting or rejecting it.
Definition: setu.h:185
#define IBF_BUCKET_SIZE
Size of one ibf bucket in bytes.
Definition: ibf.h:72
static void handle_client_accept(void *cls, const struct GNUNET_SETU_AcceptMessage *msg)
Handle a request from the client to accept a set operation that came from a remote peer...
struct ElementEntry * element
The actual element associated with the key.
Randomness for IVs etc.
uint64_t byzantine_upper_bound
Limit of number of elements in set.
unsigned int GNUNET_CONTAINER_multihashmap32_size(const struct GNUNET_CONTAINER_MultiHashMap32 *map)
Get the number of key-value pairs in the map.
enum GNUNET_GenericReturnValue GNUNET_CRYPTO_kdf(void *result, size_t out_len, const void *xts, size_t xts_len, const void *skm, size_t skm_len,...)
Derive key.
Definition: crypto_kdf.c:90
uint16_t status
See PRISM_STATUS_*-constants.
#define GNUNET_MESSAGE_TYPE_SETU_CANCEL
Cancel a set operation.
static char buf[2048]
static void salt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
Modify an IBF key k_in based on the salt, returning a salted key in k_out.
static void channel_window_cb(void *cls, const struct GNUNET_CADET_Channel *channel, int window_size)
Function called whenever an MQ-channel&#39;s transmission window size changes.
static int check_valid_phase(const uint8_t allowed_phases[], size_t size_phases, struct Operation *op)
Validates the if a message is received in a correct phase.
Continuation for multi part IBFs.
int GNUNET_CONTAINER_multihashmap_remove(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, const void *value)
Remove the given key-value pair from the map.
struct Set * set
Set associated with the operation, NULL until the spec has been associated with a set...
In the penultimate phase, we wait until all our demands are satisfied.
Element message type.
#define IBF_MIN_SIZE
Minimal size of an ibf Based on the bsc thesis of Elias Summermatter (2021)
uint32_t byzantine_lower_bound
Lower bound for the set size, used only when byzantine mode is enabled.
Definition: setu.h:252
void ibf_subtract(struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2)
Subtract ibf2 from ibf1, storing the result in ibf1.
Definition: ibf.c:356
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF.
static int check_union_p2p_send_full(void *cls, const struct TransmitFullMessage *msg)
Check send full message received from other peer.
unsigned int strata_count
Size of the IBF array in strata.
#define GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE
Maximum size of a context message for set operation requests.
uint64_t ibf_number_of_buckets_per_element
This setting determines to how many IBF buckets an single elements is mapped to.
Definition: setu.h:276
void strata_estimator_insert(struct StrataEstimator *se, struct IBF_Key key)
Add a key to the strata estimator.
size_t strata_estimator_write(const struct StrataEstimator *se, void *buf)
Write the given strata estimator to the buffer.
uint64_t elements_randomized_salt
Salt to construct the randomized element map.
Internal representation of the hash map.
static struct Operation * get_incoming(uint32_t id)
Get the incoming socket associated with the given id.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE
Request all missing elements from the other peer, based on their sets and the elements we previously ...
MESSAGE_TYPE
Message types to track in message control flow.
A 512-bit hashcode.
uint16_t result_status
Was the evaluation successful? Contains an enum GNUNET_SETU_Status in NBO.
Definition: setu.h:310
Message sent by a listening client to the service to accept performing the operation with the other p...
Definition: setu.h:78
void GNUNET_SERVICE_client_drop(struct GNUNET_SERVICE_Client *c)
Ask the server to disconnect from the given client.
Definition: service.c:2325
uint8_t force_delta
Always use delta operation instead of sending full sets, even it it&#39;s less efficient.
Definition: setu.h:229
static int destroy_key_to_element_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries...
void GNUNET_SETU_element_hash(const struct GNUNET_SETU_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition: setu_api.c:916
Message handler for a specific message type.
static int res
Initial message state.
int symmetric
GNUNET_YES to also send back set elements we are sending to the remote peer.
uint8_t byzantine
GNUNET_YES to fail operations where Byzantine faults are suspected
Definition: setu.h:241
#define LOG(kind,...)
struct GNUNET_CONTAINER_MultiHashMap32 * GNUNET_CONTAINER_multihashmap32_create(unsigned int len)
Create a 32-bit key multi hash map.
uint8_t peer_site
Defines which site a client is 0 = Initiating peer 1 = Receiving peer.
MESSAGE_CONTROL_FLOW_STATE
Different states to control the messages flow in differential mode.
uint8_t determine_strata_count(uint64_t avg_element_size, uint64_t element_count)
Calculates the optimal number of strata Estimators to send.
void GNUNET_CONTAINER_multihashmap32_destroy(struct GNUNET_CONTAINER_MultiHashMap32 *map)
Destroy a 32-bit key hash map.
struct GNUNET_CONTAINER_MultiHashMap32 * key_to_element
Maps unsalted IBF-Keys to elements.
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *mqm)
Discard the message queue message, free all allocated resources.
Definition: mq.c:323
static GstElement * source
Appsrc instance into which we write data for the pipeline.
static void handle_client_create_set(void *cls, const struct GNUNET_SETU_CreateMessage *msg)
Called when a client wants to create a new set.
int force_delta
Always use delta operation instead of sending full sets, even it it&#39;s less efficient.
Demand message type.
static int op_get_element_iterator(void *cls, uint32_t key, void *value)
Iterator over the mapping from IBF keys to element entries.
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:36
void ibf_read_slice(const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf)
Read buckets from a buffer into an ibf.
Definition: ibf.c:323
There must only be one value per key; storing a value should fail if a value under the same key alrea...
static void handle_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
struct GNUNET_TESTBED_Peer * peer
The peer associated with this model.
struct GNUNET_HashCode key
The key used in the DHT.
int strata_estimator_read(const void *buf, size_t buf_len, int is_compressed, struct StrataEstimator *se)
Read strata from the buffer into the given strata estimator.
static unsigned int size
Size of the "table".
Definition: peer.c:67
#define GNUNET_MESSAGE_TYPE_SETU_ADD
Add element to set.
struct GNUNET_CADET_Port * open_port
The port we are listening on with CADET.
uint64_t current_size
Current set size.
Definition: setu.h:299
static int determinate_avg_element_size_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for determining average size.
#define GNUNET_MQ_msg_header_extra(mh, esize, type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header and extra space...
Definition: gnunet_mq_lib.h:88
int byzantine
GNUNET_YES to fail operations where Byzantine faults are suspected
void GNUNET_CRYPTO_hash_context_read(struct GNUNET_HashContext *hc, const void *buf, size_t size)
Add data to be hashed.
Definition: crypto_hash.c:340
void ibf_destroy(struct InvertibleBloomFilter *ibf)
Destroy all resources associated with the invertible bloom filter.
Definition: ibf.c:403
struct ClientState * cs
Client that owns the set.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT
Send a set element, not as response to a demand but because we&#39;re sending the full set...
char * getenv()
struct GNUNET_CONTAINER_MultiHashMap * elements_randomized
Maps struct GNUNET_HashCode * to struct ElementEntry * randomized.
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
Hash of an IBF key.
Definition: ibf.h:54
static int decode_and_send(struct Operation *op)
Decode which elements are missing on each side, and send the appropriate offers and inquiries...
Strata estimator together with the peer&#39;s overall set size.
static void handle_union_p2p_send_full(void *cls, const struct TransmitFullMessage *msg)
Handle send full message received from other peer.
struct InvertibleBloomFilter * ibf_create(uint32_t size, uint8_t hash_num)
Create an invertible bloom filter.
Definition: ibf.c:79
static struct GNUNET_SCHEDULER_Task * timeout_task
Task to be run on timeout.
Definition: gnunet-arm.c:124
Message sent by client to the service to add an element to the set.
Definition: setu.h:325
unsigned int strata_estimator_difference(const struct StrataEstimator *se1, const struct StrataEstimator *se2)
Estimate set difference with two strata estimators, i.e.
static int check_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
int client_done_sent
Did we send the client that we are done?
struct GNUNET_PeerIdentity target_peer
Peer to evaluate the operation with.
Definition: setu.h:218
Operation context used to execute a set operation.
int GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
static int send_offers_iterator(void *cls, uint32_t key, void *value)
Iterator to send elements to a remote peer.
A request for an operation with another client.
Definition: setu.h:174
SetContent stores the actual set elements, which may be shared by multiple generations derived from o...
enum MESSAGE_CONTROL_FLOW_STATE demand
Track the message control state of the demand message.
static void handle_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
static void op_register_element(struct Operation *op, struct ElementEntry *ee, int received)
Insert an element into the union operation&#39;s key-to-element mapping.
static void handle_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Handle offers (of struct GNUNET_HashCodes) and respond with demands (of struct GNUNET_HashCodes).
static void incoming_timeout_cb(void *cls)
Timeout happens iff:
static void incoming_destroy(struct Operation *op)
Destroy an incoming request from a remote peer.
static uint32_t suggest_id
Counter for allocating unique IDs for clients, used to identify incoming operation requests from remo...
uint32_t request_id
Id of our set to evaluate, chosen implicitly by the client when it calls GNUNET_SETU_commit().
Definition: setu.h:213
static void full_sync_plausibility_check(struct Operation *op)
Function that checks if full sync is plausible.
enum MESSAGE_CONTROL_FLOW_STATE offer
Track the message control state of the offer message.
struct InvertibleBloomFilter ** strata
The IBFs of this strata estimator.
static int check_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
Allow multiple values with the same key.
unsigned int generation
First generation that includes this element.
uint8_t se_count
The number of ses transmitted.
int local_decoded_count
If an IBF is decoded this count stores how many elements are on the local site.
Definition: ibf.h:101
struct MultiStrataEstimator * se
The strata estimator is only generated once for each set.
Handle to a message queue.
Definition: mq.c:85
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF.
static struct GNUNET_CRYPTO_PowSalt salt
Salt for PoW calcualations.
MODE_OF_OPERATION
Different modes of operations.
void ibf_insert(struct InvertibleBloomFilter *ibf, struct IBF_Key key)
Insert a key into an IBF.
Definition: ibf.c:167
#define GNUNET_MQ_msg_header(type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
Definition: gnunet_mq_lib.h:76
The identity of the host (wraps the signing key of the peer).
static struct GNUNET_STATISTICS_Handle * _GSS_statistics
Statistics handle.
static void initialize_key_to_element(struct Operation *op)
Initialize the IBF key to element mapping local to this set operation.
struct GNUNET_PeerIdentity peer_id
Identity of the requesting peer.
Definition: setu.h:190
void GNUNET_CADET_receive_done(struct GNUNET_CADET_Channel *channel)
Send an ack on the channel to confirm the processing of a message.
Definition: cadet_api.c:888
static int send_missing_full_elements_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries...
static int is_message_in_message_control_flow(struct GNUNET_CONTAINER_MultiHashMap *hash_map, struct GNUNET_HashCode *hash_code, enum MESSAGE_TYPE mt)
Validate if a message in differential sync si already received before.
uint8_t force_full
Always send full sets, even if delta operations would be more efficient.
Definition: setu.h:105
struct GNUNET_CADET_Channel * GNUNET_CADET_channel_create(struct GNUNET_CADET_Handle *h, void *channel_cls, const struct GNUNET_PeerIdentity *destination, const struct GNUNET_HashCode *port, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Create a new channel towards a remote peer.
Definition: cadet_api.c:1031
struct GNUNET_CADET_Channel * channel
Channel to the peer.
We are decoding an IBF.
uint32_t salt
Salt used when hashing elements for this IBF.
struct Operation * ops_head
Evaluate operations are held in a linked list.
Message sent by the service to the client to indicate an element that is removed (set intersection) o...
Definition: setu.h:289
configuration data
uint32_t salt
Salt to use for the operation.
struct GNUNET_SERVICE_Client * client
Client this is about.
Element should be added to the result set of the remote peer, i.e.
const char * name
uint32_t client_request_id
ID used to identify an operation between service and client.
int GNUNET_CRYPTO_hash_cmp(const struct GNUNET_HashCode *h1, const struct GNUNET_HashCode *h2)
Compare function for HashCodes, producing a total ordering of all hashcodes.
Definition: crypto_hash.c:201
Track that receiving this message is expected.
uint32_t local_set_difference
Local set difference calculated with strata estimator.
struct InvertibleBloomFilter * local_ibf
The IBF with the local set&#39;s element.
struct SetContent * content
Content, possibly shared by multiple sets, and thus reference counted.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_IBF
Invertible bloom filter.
Sent to the service by the client in order to cancel a set operation.
Definition: setu.h:349
static int in_shutdown
Are we in shutdown? if GNUNET_YES and the number of clients drops to zero, disconnect from CADET...
uint16_t size
Number of bytes in the buffer pointed to by data.
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
static void handle_client_reject(void *cls, const struct GNUNET_SETU_RejectMessage *msg)
Called when the listening client rejects an operation request by another peer.
#define GNUNET_log(kind,...)
static int check_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Check offer (of struct GNUNET_HashCodes).
Entry in list of pending tasks.
Definition: scheduler.c:134
#define MAX_IBF_SIZE
The maximum size of an ibf we use is MAX_IBF_SIZE=2^20.
static void handle_client_listen(void *cls, const struct GNUNET_SETU_ListenMessage *msg)
Called when a client wants to create a new listener.
#define SE_STRATA_COUNT
Number of IBFs in a strata estimator.
static struct Listener * listener_head
Listeners are held in a doubly linked list.
static int create_randomized_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Create randomized element hashmap for full sending.
struct GNUNET_SET_Element element
The actual element.
Opaque handle to a port.
Definition: cadet_api.c:79
Mode send full set sending local set first.
Phase that receives full set first and then sends elements that are the local peer missing...
struct Listener * prev
Listeners are held in a doubly linked list.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
static int check_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Check a request for a set operation from another peer.
#define GNUNET_MESSAGE_TYPE_SETU_LISTEN
Listen for operation requests.
Offer message type.
struct StrataEstimator ** stratas
Array of strata estimators.
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
struct GNUNET_HashCode app_id
Application ID for the operation, used to distinguish multiple operations of the same type with the s...
Header for all communications.
void GNUNET_CADET_close_port(struct GNUNET_CADET_Port *p)
Close a port opened with GNUNET_CADET_open_port().
Definition: cadet_api.c:808
static void shutdown_task(void *cls)
Called to clean up, after a shutdown has been requested.
static void handle_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
uint8_t force_delta
Always use delta operation instead of sending full sets, even it it&#39;s less efficient.
Definition: setu.h:99
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:355
#define GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SETU_REJECT
Reject a set request.
static int prepare_ibf(struct Operation *op, uint32_t size)
Create an ibf with the operation&#39;s elements of the specified size.
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SETU_ADD.
Definition: setu.h:330
#define GNUNET_MESSAGE_TYPE_SETU_EVALUATE
Evaluate a set operation.
struct ClientState * cs
Client that owns the listener.
struct Listener * listener
Listener, if associated with the client, otherwise NULL.
uint32_t remote_element_count
Remote peers element count.
struct Operation * op_tail
Tail of DLL of operations this listener is responsible for.
static void * client_connect_cb(void *cls, struct GNUNET_SERVICE_Client *c, struct GNUNET_MQ_Handle *mq)
Callback called when a client connects to the service.
static int determinate_done_message_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for determining if all demands have been satisfied.
static struct GNUNET_ARM_Operation * op
Current operation.
Definition: gnunet-arm.c:144
static void channel_end_cb(void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
Function called whenever a channel is destroyed.
Keys that can be inserted into and removed from an IBF.
Definition: ibf.h:45
uint32_t salt
Salt used when hashing elements for this inquiry.
uint32_t byzantine_lower_bound
Lower bound for the set size, used only when byzantine mode is enabled.
Definition: setu.h:123
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MulitHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
static struct GNUNET_IDENTITY_EgoLookup * el
EgoLookup.
Definition: gnunet-abd.c:51
struct Set * next
Sets are held in a doubly linked list (in sets_head and sets_tail).
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER
Request all missing elements from the other peer, based on their sets and the elements we previously ...
uint64_t ibf_buckets_received
Number of ibf buckets already received into the remote_ibf.
uint32_t salt_send
Salt that we&#39;re using for sending IBFs.
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_get_value_float(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, float *number)
Get a configuration value that should be a floating point number.
void GNUNET_CADET_channel_destroy(struct GNUNET_CADET_Channel *channel)
Destroy an existing channel.
Definition: cadet_api.c:837
static void run(void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_SERVICE_Handle *service)
Function called by the service&#39;s run method to run service-specific setup code.
We sent the request message, and expect a strata estimator.
struct Listener * listener
Port this operation runs on.
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
void GNUNET_SERVICE_client_continue(struct GNUNET_SERVICE_Client *c)
Continue receiving further messages from the given client.
Definition: service.c:2244
uint16_t size
Number of bytes in the buffer pointed to by data.