GNUnet 0.28.0-dev.3-20-gf1136b0b8
 
Loading...
Searching...
No Matches
gnunet-service-setu.c
Go to the documentation of this file.
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
27#include "platform.h"
28#include "gnunet_util_lib.h"
30#include "ibf.h"
31#include "gnunet_protocols.h"
36#include <gcrypt.h>
37#include "gnunet_setu_service.h"
38#include "setu.h"
39
40#define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
41
46#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
47
51#define SE_STRATA_COUNT 32
52
53
58#define SE_IBFS_TOTAL_SIZE 632
59
63#define SE_IBF_HASH_NUM 3
64
68#define MAX_BUCKETS_PER_MESSAGE ((1 << 16) / IBF_BUCKET_SIZE)
69
75#define MAX_IBF_SIZE 1048576
76
77
82#define IBF_MIN_SIZE 37
83
88#define DIFFERENTIAL_RTT_MEAN 3.65145
89
94#define SECURITY_LEVEL 80
95
101#define PROBABILITY_FOR_NEW_ROUND 0.15
102
107#define MEASURE_PERFORMANCE 0
108
109
176
198
199
206struct ElementEntry
207{
213
219
223 unsigned int generation;
224
229 int remote;
230};
231
232
237struct Listener;
238
239
243struct Set;
244
245
249struct ClientState
250{
254 struct Set *set;
255
259 struct Listener *listener;
260
265
269 struct GNUNET_MQ_Handle *mq;
270};
271
272
276struct Operation
277{
278
284
288 uint64_t initial_size;
289
293 struct Operation *next;
294
298 struct Operation *prev;
299
304
308 struct Listener *listener;
309
313 struct GNUNET_MQ_Handle *mq;
314
319
324 struct Set *set;
325
331
336
341
348
354
359
364
369
374
378 uint32_t salt_send;
379
383 uint32_t salt_receive;
384
390
395
399 uint32_t salt;
400
404 uint32_t remote_element_count;
405
409 uint32_t client_request_id;
410
415 int force_delta;
416
421 int force_full;
422
427 int byzantine;
428
434
440
446 uint32_t suggest_id;
447
452 unsigned int generation_created;
453
454
459
460
465
466
472
478 uint8_t peer_site;
479
480
485
490
495
500
501
506
511
516
521
526
531};
532
533
538struct SetContent
539{
544
549
554
558 unsigned int refcount;
559
563 unsigned int latest_generation;
564
568 int iterator_count;
569};
570
571
575struct Set
576{
580 struct Set *next;
581
585 struct Set *prev;
586
591 struct ClientState *cs;
592
597 struct SetContent *content;
598
604
608 struct Operation *ops_head;
609
613 struct Operation *ops_tail;
614
619 unsigned int current_generation;
620
621};
622
623
627struct KeyEntry
628{
632 struct IBF_Key ibf_key;
633
640 struct ElementEntry *element;
641
647 int received;
648};
649
650
656{
661 struct IBF_Key ibf_key;
662
667 struct Operation *op;
668};
669
670
675struct Listener
676{
680 struct Listener *next;
681
685 struct Listener *prev;
686
692 struct Operation *op_head;
693
699 struct Operation *op_tail;
700
705 struct ClientState *cs;
706
711
716 struct GNUNET_HashCode app_id;
717
718};
719
720
726
731
735static struct Listener *listener_head;
736
740static struct Listener *listener_tail;
741
745static unsigned int num_clients;
746
751static int in_shutdown;
752
758static uint32_t suggest_id;
759
760#if MEASURE_PERFORMANCE
765static const struct GNUNET_CONFIGURATION_Handle *setu_cfg;
766
767
773struct perf_num_send_received_msg
774{
775 uint64_t sent;
776 uint64_t sent_var_bytes;
777 uint64_t received;
778 uint64_t received_var_bytes;
779};
780
784struct per_store_struct
785{
786 struct perf_num_send_received_msg operation_request;
787 struct perf_num_send_received_msg se;
788 struct perf_num_send_received_msg request_full;
789 struct perf_num_send_received_msg element_full;
790 struct perf_num_send_received_msg full_done;
791 struct perf_num_send_received_msg ibf;
792 struct perf_num_send_received_msg inquery;
793 struct perf_num_send_received_msg element;
794 struct perf_num_send_received_msg demand;
795 struct perf_num_send_received_msg offer;
796 struct perf_num_send_received_msg done;
797 struct perf_num_send_received_msg over;
798 uint64_t se_diff;
799 uint64_t se_diff_remote;
800 uint64_t se_diff_local;
801 uint64_t active_passive_switches;
802 uint8_t mode_of_operation;
803};
804
805struct per_store_struct perf_store;
806#endif
807
834
856
857
876
877
878#if MEASURE_PERFORMANCE
879
885static void
886load_config (struct Operation *op)
887{
888 long long number;
889 float fl;
890
891 setu_cfg = GNUNET_CONFIGURATION_create ();
893 "perf_setu.conf");
895 "IBF",
896 "BUCKET_NUMBER_FACTOR",
897 &fl);
898 op->ibf_bucket_number_factor = fl;
900 "IBF",
901 "NUMBER_PER_BUCKET",
902 &number);
903 op->ibf_number_buckets_per_element = number;
905 "PERFORMANCE",
906 "TRADEOFF",
907 &number);
908 op->rtt_bandwidth_tradeoff = number;
910 "BOUNDARIES",
911 "UPPER_ELEMENT",
912 &number);
913 op->byzantine_upper_bound = number;
914 op->peer_site = 0;
915}
916
917
924static int
925sum_sent_received_bytes (uint64_t size,
926 struct perf_num_send_received_msg
927 perf_num_send_received_msg)
928{
929 return (size * perf_num_send_received_msg.sent)
930 + (size * perf_num_send_received_msg.received)
931 + perf_num_send_received_msg.sent_var_bytes
932 + perf_num_send_received_msg.received_var_bytes;
933}
934
935
939static void
940calculate_perf_store ()
941{
942
946 float rtt = 1;
947 int bytes_transmitted = 0;
948
952 if ((perf_store.element_full.received != 0) ||
953 (perf_store.element_full.sent != 0)
954 )
955 rtt += 1;
956
957 if ((perf_store.request_full.received != 0) ||
958 (perf_store.request_full.sent != 0)
959 )
960 rtt += 0.5;
961
966 if ((perf_store.element.received != 0) ||
967 (perf_store.element.sent != 0))
968 {
969 int iterations = perf_store.active_passive_switches;
970
971 if (iterations > 0)
972 rtt += iterations * 0.5;
973 rtt += 2.5;
974 }
975
976
980 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
982 perf_store.request_full);
983
984 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
986 perf_store.element_full);
987 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
989 perf_store.element);
990 // bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_store.operation_request);
991 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
993 perf_store.se);
994 bytes_transmitted += sum_sent_received_bytes (4, perf_store.full_done);
995 bytes_transmitted += sum_sent_received_bytes (sizeof(struct IBFMessage),
996 perf_store.ibf);
997 bytes_transmitted += sum_sent_received_bytes (sizeof(struct InquiryMessage),
998 perf_store.inquery);
999 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1001 perf_store.demand);
1002 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1004 perf_store.offer);
1005 bytes_transmitted += sum_sent_received_bytes (4, perf_store.done);
1006
1010 float factor;
1011 GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR",
1012 &factor);
1013 long long num_per_bucket;
1014 GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET",
1015 &num_per_bucket);
1016
1017
1018 int decoded = 0;
1019 if (perf_store.active_passive_switches == 0)
1020 decoded = 1;
1021 int ibf_bytes_transmitted = sum_sent_received_bytes (sizeof(struct
1022 IBFMessage),
1023 perf_store.ibf);
1024
1025 FILE *out1 = fopen ("perf_data.csv", "a");
1026 fprintf (out1, "%d,%f,%d,%d,%f,%d,%d,%d,%d,%d\n",num_per_bucket,factor,
1027 decoded,ibf_bytes_transmitted,rtt,perf_store.se_diff,
1028 bytes_transmitted,
1029 perf_store.se_diff_local,perf_store.se_diff_remote,
1030 perf_store.mode_of_operation);
1031 fclose (out1);
1032
1033}
1034
1035
1036#endif
1049static uint8_t
1050estimate_best_mode_of_operation (uint64_t avg_element_size,
1051 uint64_t local_set_size,
1052 uint64_t remote_set_size,
1053 uint64_t est_set_diff_remote,
1054 uint64_t est_set_diff_local,
1055 uint64_t bandwith_latency_tradeoff,
1056 uint64_t ibf_bucket_number_factor)
1057{
1058
1059 /*
1060 * In case of initial sync fall to predefined states
1061 */
1062 {
1063 if (0 == local_set_size)
1065 if (0 == remote_set_size)
1067 }
1068 /*
1069 * Calculate bytes for full Sync
1070 */
1071 {
1072 uint8_t sizeof_full_done_header = 4;
1073 uint8_t sizeof_done_header = 4;
1074 uint8_t rtt_min_full = 2;
1075 uint8_t sizeof_request_full = 4;
1076 uint64_t estimated_total_diff = (est_set_diff_remote + est_set_diff_local);
1077
1078 /* Estimate byte required if we send first */
1079 uint64_t total_elements_to_send_local_send_first = est_set_diff_remote
1080 + local_set_size;
1081
1082 uint64_t total_bytes_full_local_send_first = (avg_element_size
1083 *
1084 total_elements_to_send_local_send_first) \
1085 + (
1086 total_elements_to_send_local_send_first * sizeof(struct
1088 ) \
1089 + (sizeof_full_done_header * 2) \
1090 + rtt_min_full
1091 * bandwith_latency_tradeoff;
1092
1093 /* Estimate bytes required if we request from remote peer */
1094 uint64_t total_elements_to_send_remote_send_first = est_set_diff_local
1095 + remote_set_size;
1096
1097 uint64_t total_bytes_full_remote_send_first = (avg_element_size
1098 *
1099 total_elements_to_send_remote_send_first) \
1100 + (
1101 total_elements_to_send_remote_send_first * sizeof(struct
1103 + (sizeof_full_done_header * 2
1104 ) \
1105 + (rtt_min_full + 0.5)
1106 * bandwith_latency_tradeoff \
1107 + sizeof_request_full;
1108
1109 /*
1110 * Calculate bytes for differential Sync
1111 */
1112
1113 /* Estimate bytes required by IBF transmission*/
1114
1115 long double ibf_bucket_count = estimated_total_diff
1116 * ibf_bucket_number_factor;
1117
1118 if (ibf_bucket_count <= IBF_MIN_SIZE)
1119 {
1120 ibf_bucket_count = IBF_MIN_SIZE;
1121 }
1122 {
1123 uint64_t ibf_message_count = ceil ( ((float) ibf_bucket_count)
1124 / ((float) MAX_BUCKETS_PER_MESSAGE));
1125
1126 uint64_t estimated_counter_size = ceil (
1127 MIN (2 * log2l (((float) local_set_size)
1128 / ((float) ibf_bucket_count)),
1129 log2l (local_set_size)));
1130
1131 long double counter_bytes = (float) estimated_counter_size / 8;
1132
1133 uint64_t ibf_bytes = ceil ((sizeof (struct IBFMessage) * ibf_message_count
1134 )
1135 * 1.2 \
1136 + (ibf_bucket_count * sizeof(struct IBF_Key))
1137 * 1.2 \
1138 + (ibf_bucket_count * sizeof(struct
1139 IBF_KeyHash))
1140 * 1.2 \
1141 + (ibf_bucket_count * counter_bytes) * 1.2);
1142
1143 /* Estimate full byte count for differential sync */
1144 uint64_t element_size = (avg_element_size
1145 + sizeof (struct GNUNET_SETU_ElementMessage)) \
1146 * estimated_total_diff;
1147 uint64_t done_size = sizeof_done_header;
1148 uint64_t inquery_size = (sizeof (struct IBF_Key)
1149 + sizeof (struct InquiryMessage))
1150 * estimated_total_diff;
1151 uint64_t demand_size =
1152 (sizeof(struct GNUNET_HashCode) + sizeof(struct GNUNET_MessageHeader))
1153 * estimated_total_diff;
1154 uint64_t offer_size = (sizeof (struct GNUNET_HashCode)
1155 + sizeof (struct GNUNET_MessageHeader))
1156 * estimated_total_diff;
1157
1158 uint64_t total_bytes_diff = (element_size + done_size + inquery_size
1159 + demand_size + offer_size + ibf_bytes) \
1161 * bandwith_latency_tradeoff);
1162
1163 uint64_t full_min = MIN (total_bytes_full_local_send_first,
1164 total_bytes_full_remote_send_first);
1165
1166 /* Decide between full and differential sync */
1167
1168 if (full_min < total_bytes_diff)
1169 {
1170 /* Decide between sending all element first or receiving all elements */
1171 if (total_bytes_full_remote_send_first >
1172 total_bytes_full_local_send_first
1173 )
1174 {
1176 }
1177 else
1178 {
1180 }
1181 }
1182 else
1183 {
1184 return DIFFERENTIAL_SYNC;
1185 }
1186 }
1187 }
1188}
1189
1190
1199static enum GNUNET_GenericReturnValue
1200check_valid_phase (const uint8_t allowed_phases[],
1201 size_t size_phases,
1202 struct Operation *op)
1203{
1207 for (uint32_t phase_ctr = 0; phase_ctr < size_phases; phase_ctr++)
1208 {
1209 uint8_t phase = allowed_phases[phase_ctr];
1210 if (phase == op->phase)
1211 {
1213 "Message received in valid phase\n");
1214 return GNUNET_YES;
1215 }
1216 }
1218 "Received message in invalid phase: %u\n", op->phase);
1219 return GNUNET_NO;
1220}
1221
1222
1234static int
1236 enum MESSAGE_CONTROL_FLOW_STATE new_mcfs,
1237 const struct GNUNET_HashCode *hash_code,
1238 enum MESSAGE_TYPE mt)
1239{
1240 struct messageControlFlowElement *cfe = NULL;
1241 enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1242
1247 cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1248 if ((ELEMENT_MESSAGE == mt) && (cfe != NULL))
1249 {
1250 if ((new_mcfs != MSG_CFS_SENT) && (MSG_CFS_RECEIVED != cfe->offer))
1251 {
1253 "Received an element without sent offer!\n");
1254 return GNUNET_NO;
1255 }
1256 /* Check that only requested elements are received! */
1257 if ((ELEMENT_MESSAGE == mt) && (new_mcfs != MSG_CFS_SENT) && (cfe->demand !=
1258 MSG_CFS_SENT))
1259 {
1261 "Received an element that was not demanded\n");
1262 return GNUNET_NO;
1263 }
1264 }
1265
1270 if (NULL == cfe)
1271 {
1273 if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (hash_map, hash_code,
1274 cfe,
1276 {
1277 GNUNET_free (cfe);
1278 return GNUNET_SYSERR;
1279 }
1280 }
1281
1286 if (OFFER_MESSAGE == mt)
1287 {
1288 mcfs = &cfe->offer;
1289 }
1290 else if (DEMAND_MESSAGE == mt)
1291 {
1292 mcfs = &cfe->demand;
1293 }
1294 else if (ELEMENT_MESSAGE == mt)
1295 {
1296 mcfs = &cfe->element;
1297 }
1298 else
1299 {
1300 return GNUNET_SYSERR;
1301 }
1302
1307 if (new_mcfs <= *mcfs)
1308 {
1309 return GNUNET_NO;
1310 }
1311
1312 *mcfs = new_mcfs;
1313 return GNUNET_YES;
1314}
1315
1316
1324static int
1327 struct GNUNET_HashCode *hash_code,
1328 enum MESSAGE_TYPE mt)
1329{
1330 struct messageControlFlowElement *cfe = NULL;
1331 enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1332
1333 cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1334
1339 if (cfe != NULL)
1340 {
1341 if (OFFER_MESSAGE == mt)
1342 {
1343 mcfs = &cfe->offer;
1344 }
1345 else if (DEMAND_MESSAGE == mt)
1346 {
1347 mcfs = &cfe->demand;
1348 }
1349 else if (ELEMENT_MESSAGE == mt)
1350 {
1351 mcfs = &cfe->element;
1352 }
1353 else
1354 {
1355 return GNUNET_SYSERR;
1356 }
1357
1361 if (*mcfs != MSG_CFS_UNINITIALIZED)
1362 {
1363 return GNUNET_YES;
1364 }
1365 }
1366 return GNUNET_NO;
1367}
1368
1369
1380static int
1382 const struct GNUNET_HashCode *key,
1383 void *value)
1384{
1385 struct messageControlFlowElement *mcfe = value;
1386
1387 if (((mcfe->element == MSG_CFS_SENT) || (mcfe->element == MSG_CFS_RECEIVED) ))
1388 {
1389 return GNUNET_YES;
1390 }
1391 return GNUNET_NO;
1392}
1393
1394
1404static int
1406 const struct GNUNET_HashCode *key,
1407 void *value)
1408{
1409 struct Operation *op = cls;
1410 struct GNUNET_SETU_Element *element = value;
1411 op->total_elements_size_local += element->size;
1412 return GNUNET_YES;
1413}
1414
1415
1425static int
1427 const struct GNUNET_HashCode *key,
1428 void *value)
1429{
1430 struct Operation *op = cls;
1431
1432 struct GNUNET_HashContext *hashed_key_context =
1434 struct GNUNET_HashCode new_key;
1435
1439 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1440 &key,
1441 sizeof(struct IBF_Key));
1442 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1443 &op->set->content->elements_randomized_salt,
1444 sizeof(uint32_t));
1445 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
1446 &new_key);
1447 GNUNET_CONTAINER_multihashmap_put (op->set->content->elements_randomized,
1448 &new_key,value,
1450 return GNUNET_YES;
1451}
1452
1453
1464static int
1466 uint32_t key,
1467 void *value)
1468{
1469 struct KeyEntry *k = value;
1470
1471 GNUNET_assert (NULL != k);
1472 if (GNUNET_YES == k->element->remote)
1473 {
1474 GNUNET_free (k->element);
1475 k->element = NULL;
1476 }
1477 GNUNET_free (k);
1478 return GNUNET_YES;
1479}
1480
1481
1488static void
1490{
1491 struct Operation *op = cls;
1492 struct GNUNET_MQ_Envelope *ev;
1493 struct GNUNET_SETU_ResultMessage *rm;
1494
1495 if (GNUNET_YES == op->client_done_sent)
1496 return;
1497 if (PHASE_FINISHED != op->phase)
1498 {
1500 "Union operation failed\n");
1502 "# Union operations failed",
1503 1,
1504 GNUNET_NO);
1507 rm->request_id = htonl (op->client_request_id);
1508 rm->element_type = htons (0);
1509 GNUNET_MQ_send (op->set->cs->mq,
1510 ev);
1511 return;
1512 }
1513
1514 op->client_done_sent = GNUNET_YES;
1515
1517 "# Union operations succeeded",
1518 1,
1519 GNUNET_NO);
1521 "Signalling client that union operation is done\n");
1522 ev = GNUNET_MQ_msg (rm,
1524 rm->request_id = htonl (op->client_request_id);
1526 rm->element_type = htons (0);
1528 op->key_to_element));
1529 GNUNET_MQ_send (op->set->cs->mq,
1530 ev);
1531}
1532
1533
1540static int
1542{
1543 if (op->byzantine != GNUNET_YES)
1544 return GNUNET_OK;
1545
1549 if (op->remote_element_count + op->remote_set_diff >
1550 op->byzantine_upper_bound)
1551 return GNUNET_SYSERR;
1552 if (op->local_element_count + op->local_set_diff > op->byzantine_upper_bound)
1553 return GNUNET_SYSERR;
1554
1558 if (op->remote_element_count < op->byzantine_lower_bound)
1559 return GNUNET_SYSERR;
1560 return GNUNET_OK;
1561}
1562
1563
1564static enum GNUNET_GenericReturnValue
1566 const struct GNUNET_HashCode *key,
1567 void *value)
1568{
1570 return GNUNET_YES;
1571}
1572
1573
1574/* FIXME: the destroy logic is a mess and should be cleaned up! */
1575
1588static void
1590{
1591 struct Set *set = op->set;
1592 struct GNUNET_CADET_Channel *channel;
1593
1595 "Destroying union operation %p\n",
1596 op);
1597 GNUNET_assert (NULL == op->listener);
1598 /* check if the op was canceled twice */
1599 if (NULL != op->remote_ibf)
1600 {
1601 ibf_destroy (op->remote_ibf);
1602 op->remote_ibf = NULL;
1603 }
1604 if (NULL != op->demanded_hashes)
1605 {
1606 GNUNET_CONTAINER_multihashmap_destroy (op->demanded_hashes);
1607 op->demanded_hashes = NULL;
1608 }
1609 if (NULL != op->local_ibf)
1610 {
1611 ibf_destroy (op->local_ibf);
1612 op->local_ibf = NULL;
1613 }
1614 if (NULL != op->se)
1615 {
1617 op->se = NULL;
1618 }
1619 if (NULL != op->key_to_element)
1620 {
1623 NULL);
1625 op->key_to_element = NULL;
1626 }
1627 if (NULL != op->message_control_flow)
1628 {
1629 GNUNET_CONTAINER_multihashmap_iterate (op->message_control_flow,
1631 NULL);
1632 GNUNET_CONTAINER_multihashmap_destroy (op->message_control_flow);
1633 op->message_control_flow = NULL;
1634 }
1635 if (NULL != op->inquiries_sent)
1636 {
1637 GNUNET_CONTAINER_multihashmap_destroy (op->inquiries_sent);
1638 op->inquiries_sent = NULL;
1639 }
1640 if (NULL != set)
1641 {
1643 set->ops_tail,
1644 op);
1645 op->set = NULL;
1646 }
1647 if (NULL != op->context_msg)
1648 {
1649 GNUNET_free (op->context_msg);
1650 op->context_msg = NULL;
1651 }
1652 if (NULL != (channel = op->channel))
1653 {
1654 /* This will free op; called conditionally as this helper function
1655 is also called from within the channel disconnect handler. */
1656 op->channel = NULL;
1658 }
1659 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
1660 * there was a channel end handler that will free 'op' on the call stack. */
1661}
1662
1663
1669static void
1671
1672
1678static void
1680{
1681 struct Listener *listener;
1682
1684 "Destroying incoming operation %p\n",
1685 op);
1686 if (NULL != (listener = op->listener))
1687 {
1689 listener->op_tail,
1690 op);
1691 op->listener = NULL;
1692 }
1693 if (NULL != op->timeout_task)
1694 {
1695 GNUNET_SCHEDULER_cancel (op->timeout_task);
1696 op->timeout_task = NULL;
1697 }
1699}
1700
1701
1707static void
1709{
1710 struct GNUNET_CADET_Channel *channel;
1711
1712 if (NULL != (channel = op->channel))
1713 {
1714 /* This will free op; called conditionally as this helper function
1715 is also called from within the channel disconnect handler. */
1716 op->channel = NULL;
1718 }
1719 if (NULL != op->listener)
1720 {
1722 return;
1723 }
1724 if (NULL != op->set)
1727 GNUNET_free (op);
1728}
1729
1730
1737static void
1739{
1740 struct GNUNET_MQ_Envelope *ev;
1742
1744 "union operation failed\n");
1746 msg->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
1747 msg->request_id = htonl (op->client_request_id);
1748 msg->element_type = htons (0);
1749 GNUNET_MQ_send (op->set->cs->mq,
1750 ev);
1752}
1753
1754
1765static void
1767{
1768 int security_level_lb = -1 * SECURITY_LEVEL;
1769 uint64_t duplicates = op->received_fresh - op->received_total;
1770
1771 if (GNUNET_YES != op->byzantine)
1772 return;
1773
1774 /*
1775 * Protect full sync from receiving double element when in FULL SENDING
1776 */
1777 if (PHASE_FULL_SENDING == op->phase)
1778 {
1779 if (duplicates > 0)
1780 {
1782 "PROTOCOL VIOLATION: Received duplicate element in full receiving "
1783 "mode of operation this is not allowed! Duplicates: %llu\n",
1784 (unsigned long long) duplicates);
1785 GNUNET_break_op (0);
1787 return;
1788 }
1789
1790 }
1791
1792 /*
1793 * Protect full sync with probabilistic algorithm
1794 */
1795 if (PHASE_FULL_RECEIVING == op->phase)
1796 {
1797
1798 long double base = (1 - (long double) (op->remote_set_diff
1799 / (long double) (op->initial_size
1800 + op->
1801 remote_set_diff)));
1802 long double exponent = (op->received_total - (op->received_fresh * ((long
1803 double)
1804 op->
1805 initial_size
1806 / (long
1807 double)
1808 op->
1809 remote_set_diff)));
1810 long double value = exponent * (log2l (base) / log2l (2));
1811 if (0 == op->remote_set_diff)
1812 op->remote_set_diff = 1;
1813 if ((value < security_level_lb) || (value > SECURITY_LEVEL) )
1814 {
1816 "PROTOCOL VIOLATION: Other peer violated probabilistic rule for receiving "
1817 "to many duplicated full element : %LF\n",
1818 value);
1819 GNUNET_break_op (0);
1821 return;
1822 }
1823 }
1824}
1825
1826
1831static void
1833{
1834 double probability = op->differential_sync_iterations * (log2l (
1836 / log2l (2));
1837 if ((-1 * SECURITY_LEVEL) > probability)
1838 {
1840 "PROTOCOL VIOLATION: Other peer violated probabilistic rule for to many active passive "
1841 "switches in differential sync: %u\n",
1842 op->differential_sync_iterations);
1843 GNUNET_break_op (0);
1845 return;
1846 }
1847}
1848
1849
1857static struct IBF_Key
1858get_ibf_key (const struct GNUNET_HashCode *src)
1859{
1860 struct IBF_Key key;
1861 uint16_t salt = 0;
1862
1865 &key, sizeof(key),
1866 src, sizeof *src,
1867 &salt, sizeof(salt)));
1868 return key;
1869}
1870
1871
1875struct GetElementContext
1876{
1880 struct GNUNET_HashCode hash;
1881
1885 struct KeyEntry *k;
1886};
1887
1888
1899static int
1901 uint32_t key,
1902 void *value)
1903{
1904 struct GetElementContext *ctx = cls;
1905 struct KeyEntry *k = value;
1906
1907 GNUNET_assert (NULL != k);
1909 &ctx->hash))
1910 {
1911 ctx->k = k;
1912 return GNUNET_NO;
1913 }
1914 return GNUNET_YES;
1915}
1916
1917
1926static struct KeyEntry *
1928 const struct GNUNET_HashCode *element_hash)
1929{
1930 int ret;
1931 struct IBF_Key ibf_key;
1932 struct GetElementContext ctx = { { { 0 } }, 0 };
1933
1934 ctx.hash = *element_hash;
1935
1936 ibf_key = get_ibf_key (element_hash);
1938 (uint32_t) ibf_key.key_val
1939 ,
1941 &ctx);
1942
1943 /* was the iteration aborted because we found the element? */
1944 if (GNUNET_SYSERR == ret)
1945 {
1946 GNUNET_assert (NULL != ctx.k);
1947 return ctx.k;
1948 }
1949 return NULL;
1950}
1951
1952
1967static void
1969 struct ElementEntry *ee,
1970 int received)
1971{
1972 struct IBF_Key ibf_key;
1973 struct KeyEntry *k;
1974
1976 k = GNUNET_new (struct KeyEntry);
1977 k->element = ee;
1978 k->ibf_key = ibf_key;
1979 k->received = received;
1981 GNUNET_CONTAINER_multihashmap32_put (op->key_to_element,
1982 (uint32_t) ibf_key.key_val
1983 ,
1984 k,
1986}
1987
1988
1993static void
1994salt_key (const struct IBF_Key *k_in,
1995 uint32_t salt,
1996 struct IBF_Key *k_out)
1997{
1998 int s = (salt * 7) % 64;
1999 uint64_t x = k_in->key_val;
2000
2001 /* rotate ibf key */
2002 x = (x >> s) | (x << (64 - s));
2003 k_out->key_val = x;
2004}
2005
2006
2010static void
2011unsalt_key (const struct IBF_Key *k_in,
2012 uint32_t salt,
2013 struct IBF_Key *k_out)
2014{
2015 int s = (salt * 7) % 64;
2016 uint64_t x = k_in->key_val;
2017
2018 x = (x << s) | (x >> (64 - s));
2019 k_out->key_val = x;
2020}
2021
2022
2030static int
2032 uint32_t key,
2033 void *value)
2034{
2035 struct Operation *op = cls;
2036 struct KeyEntry *ke = value;
2037 struct IBF_Key salted_key;
2038
2040 "[OP %p] inserting %lx (hash %s) into ibf\n",
2041 op,
2042 (unsigned long) ke->ibf_key.key_val,
2044 salt_key (&ke->ibf_key,
2045 op->salt_send,
2046 &salted_key);
2047 ibf_insert (op->local_ibf, salted_key);
2048 return GNUNET_YES;
2049}
2050
2051
2059static int
2061 struct Operation *op)
2062{
2063 return ee->generation >= op->generation_created;
2064}
2065
2066
2077static int
2079 const struct GNUNET_HashCode *key,
2080 void *value)
2081{
2082 struct Operation *op = cls;
2083 struct ElementEntry *ee = value;
2084
2085 /* make sure that the element belongs to the set at the time
2086 * of creating the operation */
2087 if (GNUNET_NO ==
2089 op))
2090 return GNUNET_YES;
2093 ee,
2094 GNUNET_NO);
2095 return GNUNET_YES;
2096}
2097
2098
2104static void
2106{
2107 unsigned int len;
2108
2109 GNUNET_assert (NULL == op->key_to_element);
2110 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
2111 op->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
2112 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2114 op);
2115}
2116
2117
2126static int
2128 uint32_t size)
2129{
2130 GNUNET_assert (NULL != op->key_to_element);
2131
2132 if (NULL != op->local_ibf)
2133 ibf_destroy (op->local_ibf);
2134 // op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
2135 op->local_ibf = ibf_create (size,
2136 ((uint8_t) op->ibf_number_buckets_per_element));
2137 if (NULL == op->local_ibf)
2138 {
2140 "Failed to allocate local IBF\n");
2141 return GNUNET_SYSERR;
2142 }
2145 op);
2146 return GNUNET_OK;
2147}
2148
2149
2159static int
2161 uint32_t ibf_size)
2162{
2163 uint64_t buckets_sent = 0;
2164 struct InvertibleBloomFilter *ibf;
2165
2169 uint32_t ibf_min_size = IBF_MIN_SIZE;
2170
2171 op->differential_sync_iterations++;
2172 if (ibf_size < ibf_min_size)
2173 {
2174 ibf_size = ibf_min_size;
2175 }
2176 if (GNUNET_OK !=
2178 {
2179 /* allocation failed */
2180 return GNUNET_SYSERR;
2181 }
2182
2184 "sending ibf of size %u\n",
2185 (unsigned int) ibf_size);
2186
2187 {
2188 char name[64];
2189
2191 sizeof(name),
2192 "# sent IBF (order %u)",
2193 ibf_size);
2195 }
2196
2197 ibf = op->local_ibf;
2198
2199 while (buckets_sent < ibf_size)
2200 {
2201 unsigned int buckets_in_message;
2202 struct GNUNET_MQ_Envelope *ev;
2203 struct IBFMessage *msg;
2204
2205 buckets_in_message = ibf_size - buckets_sent;
2206 /* limit to maximum */
2207 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
2208 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
2209
2210#if MEASURE_PERFORMANCE
2211 perf_store.ibf.sent += 1;
2212 perf_store.ibf.sent_var_bytes += (buckets_in_message * IBF_BUCKET_SIZE);
2213#endif
2215 buckets_in_message * IBF_BUCKET_SIZE,
2217 msg->ibf_size = ibf_size;
2218 msg->offset = htonl (buckets_sent);
2219 msg->salt = htonl (op->salt_send);
2220 msg->ibf_counter_bit_length = ibf_get_max_counter (ibf);
2221
2222
2223 ibf_write_slice (ibf, buckets_sent,
2224 buckets_in_message, &msg[1], msg->ibf_counter_bit_length);
2225 buckets_sent += buckets_in_message;
2227 "ibf chunk size %u, %llu/%u sent\n",
2228 (unsigned int) buckets_in_message,
2229 (unsigned long long) buckets_sent,
2230 (unsigned int) ibf_size);
2231 GNUNET_MQ_send (op->mq, ev);
2232 }
2233
2234 /* The other peer must decode the IBF, so
2235 * we're passive. */
2236 op->phase = PHASE_PASSIVE_DECODING;
2237 return GNUNET_OK;
2238}
2239
2240
2248static unsigned int
2249get_size_from_difference (unsigned int diff, int number_buckets_per_element,
2250 float ibf_bucket_number_factor)
2251{
2254 return (((int) (diff * ibf_bucket_number_factor)) | 1);
2255
2256}
2257
2258
2259static unsigned int
2260get_next_ibf_size (float ibf_bucket_number_factor, unsigned int
2261 decoded_elements, unsigned int last_ibf_size)
2262{
2263 unsigned int next_size = (unsigned int) ((last_ibf_size * 2)
2264 - (ibf_bucket_number_factor
2265 * decoded_elements));
2268 return next_size | 1;
2269}
2270
2271
2281static int
2283 const struct GNUNET_HashCode *key,
2284 void *value)
2285{
2286 struct Operation *op = cls;
2287 struct GNUNET_SETU_ElementMessage *emsg;
2288 struct ElementEntry *ee = value;
2289 struct GNUNET_SETU_Element *el = &ee->element;
2290 struct GNUNET_MQ_Envelope *ev;
2291
2293 "Sending element %s\n",
2294 GNUNET_h2s (key));
2295#if MEASURE_PERFORMANCE
2296 perf_store.element_full.received += 1;
2297 perf_store.element_full.received_var_bytes += el->size;
2298#endif
2299 ev = GNUNET_MQ_msg_extra (emsg,
2300 el->size,
2302 emsg->element_type = htons (el->element_type);
2303 GNUNET_memcpy (&emsg[1],
2304 el->data,
2305 el->size);
2306 GNUNET_MQ_send (op->mq,
2307 ev);
2308 return GNUNET_YES;
2309}
2310
2311
2317static void
2319{
2320 struct GNUNET_MQ_Envelope *ev;
2321
2322 op->phase = PHASE_FULL_SENDING;
2324 "Dedicing to transmit the full set\n");
2325 /* FIXME: use a more memory-friendly way of doing this with an
2326 iterator, just as we do in the non-full case! */
2327
2328 // Randomize Elements to send
2329 op->set->content->elements_randomized = GNUNET_CONTAINER_multihashmap_create (
2330 32,GNUNET_NO);
2331 op->set->content->elements_randomized_salt = GNUNET_CRYPTO_random_u64 (
2332 UINT64_MAX);
2333 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2334 &
2336 op);
2337
2339 op->set->content->elements_randomized,
2341 op);
2342#if MEASURE_PERFORMANCE
2343 perf_store.full_done.sent += 1;
2344#endif
2346 GNUNET_MQ_send (op->mq,
2347 ev);
2348}
2349
2350
2357static int
2359 const struct StrataEstimatorMessage *msg)
2360{
2361 struct Operation *op = cls;
2362 int is_compressed;
2363 size_t len;
2364
2365 if (op->phase != PHASE_EXPECT_SE)
2366 {
2367 GNUNET_break (0);
2368 return GNUNET_SYSERR;
2369 }
2370 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
2371 msg->header.type));
2372 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
2373 if ((GNUNET_NO == is_compressed) &&
2375 {
2376 GNUNET_break (0);
2377 return GNUNET_SYSERR;
2378 }
2379 return GNUNET_OK;
2380}
2381
2382
2389static void
2391 const struct StrataEstimatorMessage *msg)
2392{
2393#if MEASURE_PERFORMANCE
2394 perf_store.se.received += 1;
2395 perf_store.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct
2397#endif
2398 struct Operation *op = cls;
2399 struct MultiStrataEstimator *remote_se;
2400 unsigned int diff;
2401 uint64_t other_size;
2402 size_t len;
2403 int is_compressed;
2404 uint8_t allowed_phases[] = {PHASE_EXPECT_SE};
2405 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
2406 op->set->content->elements);
2407 // Setting peer site to receiving peer
2408 op->peer_site = 1;
2409
2413 if (GNUNET_OK !=
2414 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2415 {
2416 GNUNET_break (0);
2418 return;
2419 }
2420
2422 if ((msg->se_count > 8) || (__builtin_popcount ((int) msg->se_count) != 1))
2423 {
2425 "PROTOCOL VIOLATION: Invalid number of se transmitted by other peer %u\n",
2426 msg->se_count);
2427 GNUNET_break_op (0);
2429 return;
2430 }
2431
2432 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
2433 msg->header.type));
2435 "# bytes of SE received",
2436 ntohs (msg->header.size),
2437 GNUNET_NO);
2438 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
2439 other_size = GNUNET_ntohll (msg->set_size);
2440 op->remote_element_count = other_size;
2441
2442 if (op->byzantine_upper_bound < op->remote_element_count)
2443 {
2445 "Exceeded configured upper bound <%" PRIu64 "> of element: %u\n",
2446 op->byzantine_upper_bound,
2447 op->remote_element_count);
2449 return;
2450 }
2451
2455 if (NULL == remote_se)
2456 {
2457 /* insufficient resources, fail */
2459 return;
2460 }
2461 if (GNUNET_OK !=
2463 len,
2464 is_compressed,
2465 msg->se_count,
2467 remote_se))
2468 {
2469 /* decompression failed */
2470 strata_estimator_destroy (remote_se);
2472 return;
2473 }
2474 GNUNET_assert (NULL != op->se);
2475 strata_estimator_difference (remote_se,
2476 op->se);
2477 {
2478 /* Calculate remote local diff */
2479 long diff_remote = remote_se->stratas[0]->strata[0]->remote_decoded_count;
2480 long diff_local = remote_se->stratas[0]->strata[0]->local_decoded_count;
2481 uint64_t avg_element_size = 0;
2482
2483 /* Prevent estimations from overshooting max element */
2484 if (diff_remote + op->remote_element_count > op->byzantine_upper_bound)
2485 diff_remote = op->byzantine_upper_bound - op->remote_element_count;
2486 if (diff_local + op->local_element_count > op->byzantine_upper_bound)
2487 diff_local = op->byzantine_upper_bound - op->local_element_count;
2488 if ((diff_remote < 0) || (diff_local < 0))
2489 {
2490 strata_estimator_destroy (remote_se);
2492 "PROTOCOL VIOLATION: More element is set as upper boundary or other peer is "
2493 "malicious: remote diff %ld, local diff: %ld\n",
2494 diff_remote, diff_local);
2495 GNUNET_break_op (0);
2497 return;
2498 }
2499 /* Make estimation more precise in initial sync cases */
2500 if (0 == op->remote_element_count)
2501 {
2502 diff_remote = 0;
2503 diff_local = op->local_element_count;
2504 }
2505 if (0 == op->local_element_count)
2506 {
2507 diff_local = 0;
2508 diff_remote = op->remote_element_count;
2509 }
2510
2511 diff = diff_remote + diff_local;
2512 op->remote_set_diff = diff_remote;
2513
2515 if (0 < op->local_element_count)
2516 {
2517 op->total_elements_size_local = 0;
2518 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2519 &
2521 op);
2522 avg_element_size = op->total_elements_size_local / op->local_element_count
2523 ;
2524 }
2525
2526 op->mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2528 (
2529 op->set->content
2530 ->
2531 elements),
2532 op->
2533 remote_element_count,
2534 diff_remote,
2535 diff_local,
2536 op->
2537 rtt_bandwidth_tradeoff,
2538 op->
2539 ibf_bucket_number_factor);
2540
2541#if MEASURE_PERFORMANCE
2542 perf_store.se_diff_local = diff_local;
2543 perf_store.se_diff_remote = diff_remote;
2544 perf_store.se_diff = diff;
2545 perf_store.mode_of_operation = op->mode_of_operation;
2546#endif
2547
2548 strata_estimator_destroy (remote_se);
2550 op->se = NULL;
2552 "got se diff=%d, using ibf size %d\n",
2553 diff,
2554 1U << get_size_from_difference (diff, op->
2555 ibf_number_buckets_per_element,
2556 op->ibf_bucket_number_factor));
2557
2558 {
2559 char *set_debug;
2560
2561 set_debug = getenv ("GNUNET_SETU_BENCHMARK");
2562 if ((NULL != set_debug) &&
2563 (0 == strcmp (set_debug, "1")))
2564 {
2565 FILE *f = fopen ("set.log", "a");
2566 fprintf (f, "%llu\n", (unsigned long long) diff);
2567 fclose (f);
2568 }
2569 }
2570
2571 if ((GNUNET_YES == op->byzantine) &&
2572 (other_size < op->byzantine_lower_bound))
2573 {
2574 GNUNET_break (0);
2576 return;
2577 }
2578
2579 if ((GNUNET_YES == op->force_full) ||
2580 (op->mode_of_operation != DIFFERENTIAL_SYNC))
2581 {
2583 "Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
2584 diff,
2585 (unsigned long long) op->initial_size);
2587 "# of full sends",
2588 1,
2589 GNUNET_NO);
2590 if (FULL_SYNC_LOCAL_SENDING_FIRST == op->mode_of_operation)
2591 {
2592 struct TransmitFullMessage *signal_msg;
2593 struct GNUNET_MQ_Envelope *ev;
2594 ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2596 signal_msg->remote_set_difference = htonl (diff_local);
2597 signal_msg->remote_set_size = htonl (op->local_element_count);
2598 signal_msg->local_set_difference = htonl (diff_remote);
2599 GNUNET_MQ_send (op->mq,
2600 ev);
2601 send_full_set (op);
2602 }
2603 else
2604 {
2605 struct GNUNET_MQ_Envelope *ev;
2606
2608 "Telling other peer that we expect its full set\n");
2609 op->phase = PHASE_FULL_RECEIVING;
2610#if MEASURE_PERFORMANCE
2611 perf_store.request_full.sent += 1;
2612#endif
2613 {
2614 struct TransmitFullMessage *signal_msg;
2615 ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct
2618 signal_msg->remote_set_difference = htonl (diff_local);
2619 signal_msg->remote_set_size = htonl (op->local_element_count);
2620 signal_msg->local_set_difference = htonl (diff_remote);
2621 GNUNET_MQ_send (op->mq,
2622 ev);
2623 }
2624 }
2625 }
2626 else
2627 {
2629 "# of ibf sends",
2630 1,
2631 GNUNET_NO);
2632 if (GNUNET_OK !=
2633 send_ibf (op,
2635 op->ibf_number_buckets_per_element
2636 ,
2637 op->ibf_bucket_number_factor)))
2638 {
2639 /* Internal error, best we can do is shut the connection */
2641 "Failed to send IBF, closing connection\n");
2643 return;
2644 }
2645 }
2646 GNUNET_CADET_receive_done (op->channel);
2647 }
2648}
2649
2650
2658static int
2660 uint32_t key,
2661 void *value)
2662{
2663 struct SendElementClosure *sec = cls;
2664 struct Operation *op = sec->op;
2665 struct KeyEntry *ke = value;
2666 struct GNUNET_MQ_Envelope *ev;
2667 struct GNUNET_MessageHeader *mh;
2668
2669 /* Detect 32-bit key collision for the 64-bit IBF keys. */
2670 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
2671 {
2672 op->active_passive_switch_required = true;
2673 return GNUNET_YES;
2674 }
2675
2676 /* Prevent implementation from sending a offer multiple times in case of roll switch */
2677 if (GNUNET_YES ==
2679 op->message_control_flow,
2680 &ke->element->element_hash,
2682 )
2683 {
2685 "Skipping already sent processed element offer!\n");
2686 return GNUNET_YES;
2687 }
2688
2689 /* Save send offer message for message control */
2690 if (GNUNET_YES !=
2692 op->message_control_flow,
2694 &ke->element->element_hash,
2696 )
2697 {
2699 "Double offer message sent found!\n");
2700 GNUNET_break (0);
2702 return GNUNET_NO;
2703 }
2704 ;
2705
2706 /* Mark element to be expected to received */
2707 if (GNUNET_YES !=
2709 op->message_control_flow,
2711 &ke->element->element_hash,
2713 )
2714 {
2716 "Double demand received found!\n");
2717 GNUNET_break (0);
2719 return GNUNET_NO;
2720 }
2721 ;
2722#if MEASURE_PERFORMANCE
2723 perf_store.offer.sent += 1;
2724 perf_store.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode);
2725#endif
2727 sizeof(struct GNUNET_HashCode),
2729 GNUNET_assert (NULL != ev);
2730 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
2732 "[OP %p] sending element offer (%s) to peer\n",
2733 op,
2735 GNUNET_MQ_send (op->mq, ev);
2736 return GNUNET_YES;
2737}
2738
2739
2746static void
2748 struct IBF_Key ibf_key)
2749{
2750 struct SendElementClosure send_cls;
2751
2752 send_cls.ibf_key = ibf_key;
2753 send_cls.op = op;
2755 op->key_to_element,
2756 (uint32_t) ibf_key.
2757 key_val,
2759 &send_cls);
2760}
2761
2762
2770static int
2772{
2773 struct IBF_Key key;
2774 struct IBF_Key last_key;
2775 int side;
2776 unsigned int num_decoded;
2777 struct InvertibleBloomFilter *diff_ibf;
2778
2780
2781 if (GNUNET_OK !=
2782 prepare_ibf (op,
2783 op->remote_ibf->size))
2784 {
2785 GNUNET_break (0);
2786 /* allocation failed */
2787 return GNUNET_SYSERR;
2788 }
2789
2790 diff_ibf = ibf_dup (op->local_ibf);
2791 ibf_subtract (diff_ibf,
2792 op->remote_ibf);
2793
2794 ibf_destroy (op->remote_ibf);
2795 op->remote_ibf = NULL;
2796
2798 "decoding IBF (size=%u)\n",
2799 diff_ibf->size);
2800
2801 num_decoded = 0;
2802 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
2803
2804 while (1)
2805 {
2806 int res;
2807 int cycle_detected = GNUNET_NO;
2808
2809 last_key = key;
2810
2811 res = ibf_decode (diff_ibf,
2812 &side,
2813 &key);
2814 if (res == GNUNET_OK)
2815 {
2817 "decoded ibf key %lx\n",
2818 (unsigned long) key.key_val);
2819 num_decoded += 1;
2820 if ((num_decoded > diff_ibf->size) ||
2821 ((num_decoded > 1) &&
2822 (last_key.key_val == key.key_val)))
2823 {
2825 "detected cyclic ibf (decoded %u/%u)\n",
2826 num_decoded,
2827 diff_ibf->size);
2828 cycle_detected = GNUNET_YES;
2829 }
2830 }
2831 if ((GNUNET_SYSERR == res) ||
2832 (GNUNET_YES == cycle_detected))
2833 {
2834 uint32_t next_size;
2837 next_size = get_next_ibf_size (op->ibf_bucket_number_factor, num_decoded,
2838 diff_ibf->size);
2841 {
2842 uint32_t ibf_min_size = IBF_MIN_SIZE | 1;
2843
2844 if (next_size<ibf_min_size)
2845 next_size = ibf_min_size;
2846 }
2847
2848 if (next_size <= MAX_IBF_SIZE)
2849 {
2851 "decoding failed, sending larger ibf (size %u)\n",
2852 next_size);
2854 "# of IBF retries",
2855 1,
2856 GNUNET_NO);
2857#if MEASURE_PERFORMANCE
2858 perf_store.active_passive_switches += 1;
2859#endif
2860
2861 op->salt_send = op->salt_receive++;
2862
2863 if (GNUNET_OK !=
2864 send_ibf (op, next_size))
2865 {
2866 /* Internal error, best we can do is shut the connection */
2868 "Failed to send IBF, closing connection\n");
2870 ibf_destroy (diff_ibf);
2871 return GNUNET_SYSERR;
2872 }
2873 }
2874 else
2875 {
2877 "# of failed union operations (too large)",
2878 1,
2879 GNUNET_NO);
2880 // XXX: Send the whole set, element-by-element
2882 "set union failed: reached ibf limit\n");
2884 ibf_destroy (diff_ibf);
2885 return GNUNET_SYSERR;
2886 }
2887 break;
2888 }
2889 if (GNUNET_NO == res)
2890 {
2891 struct GNUNET_MQ_Envelope *ev;
2892
2894 "transmitted all values, sending DONE\n");
2895
2896#if MEASURE_PERFORMANCE
2897 perf_store.done.sent += 1;
2898#endif
2900 GNUNET_MQ_send (op->mq, ev);
2901 /* We now wait until we get a DONE message back
2902 * and then wait for our MQ to be flushed and all our
2903 * demands be delivered. */
2904 break;
2905 }
2906 if (1 == side)
2907 {
2908 struct IBF_Key unsalted_key;
2909 unsalt_key (&key,
2910 op->salt_receive,
2911 &unsalted_key);
2913 unsalted_key);
2914 }
2915 else if (-1 == side)
2916 {
2917 struct GNUNET_MQ_Envelope *ev;
2918 struct InquiryMessage *msg;
2919
2920#if MEASURE_PERFORMANCE
2921 perf_store.inquery.sent += 1;
2922 perf_store.inquery.sent_var_bytes += sizeof(struct IBF_Key);
2923#endif
2924
2926 struct GNUNET_HashContext *hashed_key_context =
2928 struct GNUNET_HashCode *hashed_key = (struct
2930 sizeof(struct GNUNET_HashCode));
2932 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
2933 &key,
2934 sizeof(struct IBF_Key));
2935 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
2936 hashed_key);
2937 GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
2938 hashed_key,
2939 &mcfs,
2941 );
2942
2943 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
2944 * the effort additional complexity. */
2946 sizeof(struct IBF_Key),
2948 msg->salt = htonl (op->salt_receive);
2949 GNUNET_memcpy (&msg[1],
2950 &key,
2951 sizeof(struct IBF_Key));
2953 "sending element inquiry for IBF key %lx\n",
2954 (unsigned long) key.key_val);
2955 GNUNET_MQ_send (op->mq, ev);
2956 }
2957 else
2958 {
2959 GNUNET_assert (0);
2960 }
2961 }
2962 ibf_destroy (diff_ibf);
2963 return GNUNET_OK;
2964}
2965
2966
2974static int
2976 const struct TransmitFullMessage *msg)
2977{
2978 return GNUNET_OK;
2979}
2980
2981
2988static void
2990 const struct TransmitFullMessage *msg)
2991{
2992 struct Operation *op = cls;
2993
2997 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
2998 if (GNUNET_OK !=
2999 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3000 {
3001 GNUNET_break (0);
3003 return;
3004 }
3005
3007 op->remote_element_count = ntohl (msg->remote_set_size);
3008 op->remote_set_diff = ntohl (msg->remote_set_difference);
3009 op->local_set_diff = ntohl (msg->local_set_difference);
3010
3013 {
3015 "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
3016 "criteria\n");
3017 GNUNET_break_op (0);
3019 return;
3020 }
3021
3023 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
3024 op->set->content->elements);
3025 {
3026 uint64_t avg_element_size = 0;
3028 if (0 < op->local_element_count)
3029 {
3030 op->total_elements_size_local = 0;
3031 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
3032 &
3034 op);
3035 avg_element_size = op->total_elements_size_local / op->local_element_count
3036 ;
3037 }
3038
3041 op->
3043 op->
3045 op->local_set_diff,
3046 op->remote_set_diff
3047 ,
3048 op->
3050 op->
3053 {
3055 "PROTOCOL VIOLATION: Remote peer choose to send his full set first but correct mode would have been"
3056 " : %d\n", mode_of_operation);
3057 GNUNET_break_op (0);
3059 return;
3060 }
3061 op->phase = PHASE_FULL_RECEIVING;
3062 }
3063}
3064
3065
3076static int
3078 const struct IBFMessage *msg)
3079{
3080 struct Operation *op = cls;
3081 unsigned int buckets_in_message;
3082
3083 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
3085 if (0 == buckets_in_message)
3086 {
3087 GNUNET_break_op (0);
3088 return GNUNET_SYSERR;
3089 }
3090 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
3092 {
3093 GNUNET_break_op (0);
3094 return GNUNET_SYSERR;
3095 }
3096 if (op->phase == PHASE_EXPECT_IBF_LAST)
3097 {
3098 if (ntohl (msg->offset) != op->ibf_buckets_received)
3099 {
3100 GNUNET_break_op (0);
3101 return GNUNET_SYSERR;
3102 }
3103
3104 if (msg->ibf_size != op->remote_ibf->size)
3105 {
3106 GNUNET_break_op (0);
3107 return GNUNET_SYSERR;
3108 }
3109 if (ntohl (msg->salt) != op->salt_receive)
3110 {
3111 GNUNET_break_op (0);
3112 return GNUNET_SYSERR;
3113 }
3114 }
3115 else if ((op->phase != PHASE_PASSIVE_DECODING) &&
3116 (op->phase != PHASE_EXPECT_IBF))
3117 {
3118 GNUNET_break_op (0);
3119 return GNUNET_SYSERR;
3120 }
3121
3122 return GNUNET_OK;
3123}
3124
3125
3135static void
3137 const struct IBFMessage *msg)
3138{
3139 struct Operation *op = cls;
3140 unsigned int buckets_in_message;
3144 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF, PHASE_EXPECT_IBF_LAST,
3146 if (GNUNET_OK !=
3147 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3148 {
3149 GNUNET_break (0);
3151 return;
3152 }
3153 op->differential_sync_iterations++;
3155 op->active_passive_switch_required = false;
3156
3157#if MEASURE_PERFORMANCE
3158 perf_store.ibf.received += 1;
3159 perf_store.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg);
3160#endif
3161
3162 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
3164 if ((op->phase == PHASE_PASSIVE_DECODING) ||
3165 (op->phase == PHASE_EXPECT_IBF))
3166 {
3167 op->phase = PHASE_EXPECT_IBF_LAST;
3168 GNUNET_assert (NULL == op->remote_ibf);
3170 "Creating new ibf of size %u\n",
3171 ntohl (msg->ibf_size));
3172 // op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
3173 op->remote_ibf = ibf_create (msg->ibf_size,
3174 ((uint8_t) op->ibf_number_buckets_per_element))
3175 ;
3176 op->salt_receive = ntohl (msg->salt);
3178 "Receiving new IBF with salt %u\n",
3179 op->salt_receive);
3180 if (NULL == op->remote_ibf)
3181 {
3183 "Failed to parse remote IBF, closing connection\n");
3185 return;
3186 }
3187 op->ibf_buckets_received = 0;
3188 if (0 != ntohl (msg->offset))
3189 {
3190 GNUNET_break_op (0);
3192 return;
3193 }
3194 }
3195 else
3196 {
3199 "Received more of IBF\n");
3200 }
3201 GNUNET_assert (NULL != op->remote_ibf);
3202
3203 ibf_read_slice (&msg[1],
3204 op->ibf_buckets_received,
3205 buckets_in_message,
3206 op->remote_ibf, msg->ibf_counter_bit_length);
3207 op->ibf_buckets_received += buckets_in_message;
3208
3209 if (op->ibf_buckets_received == op->remote_ibf->size)
3210 {
3212 "received full ibf\n");
3213 op->phase = PHASE_ACTIVE_DECODING;
3214 if (GNUNET_OK !=
3216 {
3217 /* Internal error, best we can do is shut down */
3219 "Failed to decode IBF, closing connection\n");
3221 return;
3222 }
3223 }
3224 GNUNET_CADET_receive_done (op->channel);
3225}
3226
3227
3236static void
3238 const struct GNUNET_SETU_Element *element,
3240{
3241 struct GNUNET_MQ_Envelope *ev;
3242 struct GNUNET_SETU_ResultMessage *rm;
3243
3245 "sending element (size %u) to client\n",
3246 element->size);
3247 GNUNET_assert (0 != op->client_request_id);
3248 ev = GNUNET_MQ_msg_extra (rm,
3249 element->size,
3251 if (NULL == ev)
3252 {
3253 GNUNET_MQ_discard (ev);
3254 GNUNET_break (0);
3255 return;
3256 }
3257 rm->result_status = htons (status);
3258 rm->request_id = htonl (op->client_request_id);
3259 rm->element_type = htons (element->element_type);
3261 op->key_to_element));
3262 GNUNET_memcpy (&rm[1],
3263 element->data,
3264 element->size);
3265 GNUNET_MQ_send (op->set->cs->mq,
3266 ev);
3267}
3268
3269
3275static void
3277{
3278 unsigned int num_demanded;
3280 op->message_control_flow,
3281 &
3283 op);
3284 num_demanded = GNUNET_CONTAINER_multihashmap_size (
3285 op->demanded_hashes);
3286 if (PHASE_FINISH_WAITING == op->phase)
3287 {
3289 "In PHASE_FINISH_WAITING, pending %u demands -> %d\n",
3290 num_demanded, op->peer_site);
3291 if (-1 != send_done)
3292 {
3293 struct GNUNET_MQ_Envelope *ev;
3294
3295 op->phase = PHASE_FINISHED;
3296#if MEASURE_PERFORMANCE
3297 perf_store.done.sent += 1;
3298#endif
3300 GNUNET_MQ_send (op->mq,
3301 ev);
3302 /* We now wait until the other peer sends P2P_OVER
3303 * after it got all elements from us. */
3304 }
3305 }
3306 if (PHASE_FINISH_CLOSING == op->phase)
3307 {
3309 "In PHASE_FINISH_CLOSING, pending %u demands %d\n",
3310 num_demanded, op->peer_site);
3311 if (-1 != send_done)
3312 {
3313 op->phase = PHASE_FINISHED;
3316 }
3317 }
3318}
3319
3320
3327static int
3329 const struct GNUNET_SETU_ElementMessage *emsg)
3330{
3331 struct Operation *op = cls;
3332
3333 if (0 == GNUNET_CONTAINER_multihashmap_size (op->demanded_hashes))
3334 {
3335 GNUNET_break_op (0);
3336 return GNUNET_SYSERR;
3337 }
3338 return GNUNET_OK;
3339}
3340
3341
3350static void
3352 const struct GNUNET_SETU_ElementMessage *emsg)
3353{
3354 struct Operation *op = cls;
3355 struct ElementEntry *ee;
3356 struct KeyEntry *ke;
3357 uint16_t element_size;
3358
3362 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3364 if (GNUNET_OK !=
3365 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3366 {
3367 GNUNET_break (0);
3369 return;
3370 }
3371
3372 element_size = ntohs (emsg->header.size) - sizeof(struct
3374#if MEASURE_PERFORMANCE
3375 perf_store.element.received += 1;
3376 perf_store.element.received_var_bytes += element_size;
3377#endif
3378
3379 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
3380 GNUNET_memcpy (&ee[1],
3381 &emsg[1],
3382 element_size);
3383 ee->element.size = element_size;
3384 ee->element.data = &ee[1];
3385 ee->element.element_type = ntohs (emsg->element_type);
3386 ee->remote = GNUNET_YES;
3388 &ee->element_hash);
3389 if (GNUNET_NO ==
3390 GNUNET_CONTAINER_multihashmap_remove (op->demanded_hashes,
3391 &ee->element_hash,
3392 NULL))
3393 {
3394 /* We got something we didn't demand, since it's not in our map. */
3395 GNUNET_break_op (0);
3397 return;
3398 }
3399
3400 if (GNUNET_OK !=
3402 op->message_control_flow,
3404 &ee->element_hash,
3406 )
3407 {
3409 "An element has been received more than once!\n");
3410 GNUNET_break (0);
3412 return;
3413 }
3414
3416 "Got element (size %u, hash %s) from peer\n",
3417 (unsigned int) element_size,
3418 GNUNET_h2s (&ee->element_hash));
3419
3421 "# received elements",
3422 1,
3423 GNUNET_NO);
3425 "# exchanged elements",
3426 1,
3427 GNUNET_NO);
3428
3429 op->received_total++;
3430
3431 ke = op_get_element (op,
3432 &ee->element_hash);
3433 if (NULL != ke)
3434 {
3435 /* Got repeated element. Should not happen since
3436 * we track demands. */
3438 "# repeated elements",
3439 1,
3440 GNUNET_NO);
3441 ke->received = GNUNET_YES;
3442 GNUNET_free (ee);
3443 }
3444 else
3445 {
3447 "Registering new element from remote peer\n");
3448 op->received_fresh++;
3450 /* only send results immediately if the client wants it */
3452 &ee->element,
3454 }
3455
3456 if ((op->received_total > 8) &&
3457 (op->received_fresh < op->received_total / 3))
3458 {
3459 /* The other peer gave us lots of old elements, there's something wrong. */
3460 GNUNET_break_op (0);
3462 return;
3463 }
3464 GNUNET_CADET_receive_done (op->channel);
3465 maybe_finish (op);
3466}
3467
3468
3475static int
3477 const struct GNUNET_SETU_ElementMessage *emsg)
3478{
3479 struct Operation *op = cls;
3480
3481 (void) op;
3482
3483 // FIXME: check that we expect full elements here?
3484 return GNUNET_OK;
3485}
3486
3487
3494static void
3496 const struct GNUNET_SETU_ElementMessage *emsg)
3497{
3498 struct Operation *op = cls;
3499 struct ElementEntry *ee;
3500 struct KeyEntry *ke;
3501 uint16_t element_size;
3502
3506 uint8_t allowed_phases[] = {PHASE_FULL_RECEIVING, PHASE_FULL_SENDING};
3507 if (GNUNET_OK !=
3508 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3509 {
3510 GNUNET_break (0);
3512 return;
3513 }
3514
3515 element_size = ntohs (emsg->header.size)
3516 - sizeof(struct GNUNET_SETU_ElementMessage);
3517
3518#if MEASURE_PERFORMANCE
3519 perf_store.element_full.received += 1;
3520 perf_store.element_full.received_var_bytes += element_size;
3521#endif
3522
3523 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
3524 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
3525 ee->element.size = element_size;
3526 ee->element.data = &ee[1];
3527 ee->element.element_type = ntohs (emsg->element_type);
3528 ee->remote = GNUNET_YES;
3530 &ee->element_hash);
3532 "Got element (full diff, size %u, hash %s) from peer\n",
3533 (unsigned int) element_size,
3534 GNUNET_h2s (&ee->element_hash));
3535
3537 "# received elements",
3538 1,
3539 GNUNET_NO);
3541 "# exchanged elements",
3542 1,
3543 GNUNET_NO);
3544
3545 op->received_total++;
3546 ke = op_get_element (op,
3547 &ee->element_hash);
3548 if (NULL != ke)
3549 {
3551 "# repeated elements",
3552 1,
3553 GNUNET_NO);
3555 ke->received = GNUNET_YES;
3556 GNUNET_free (ee);
3557 }
3558 else
3559 {
3561 "Registering new element from remote peer\n");
3562 op->received_fresh++;
3564 /* only send results immediately if the client wants it */
3566 &ee->element,
3568 }
3569
3570
3571 if ((GNUNET_YES == op->byzantine) &&
3572 (op->received_total > op->remote_element_count) )
3573 {
3574 /* The other peer gave us lots of old elements, there's something wrong. */
3576 "Other peer sent %llu elements while pretending to have %llu elements, failing operation\n",
3577 (unsigned long long) op->received_total,
3578 (unsigned long long) op->remote_element_count);
3579 GNUNET_break_op (0);
3581 return;
3582 }
3583 GNUNET_CADET_receive_done (op->channel);
3584}
3585
3586
3594static int
3596 const struct InquiryMessage *msg)
3597{
3598 struct Operation *op = cls;
3599 unsigned int num_keys;
3600
3601 if (op->phase != PHASE_PASSIVE_DECODING)
3602 {
3603 GNUNET_break_op (0);
3604 return GNUNET_SYSERR;
3605 }
3606 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3607 / sizeof(struct IBF_Key);
3608 if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3609 != num_keys * sizeof(struct IBF_Key))
3610 {
3611 GNUNET_break_op (0);
3612 return GNUNET_SYSERR;
3613 }
3614 return GNUNET_OK;
3615}
3616
3617
3624static void
3626 const struct InquiryMessage *msg)
3627{
3628 struct Operation *op = cls;
3629 const struct IBF_Key *ibf_key;
3630 unsigned int num_keys;
3631
3635 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
3636 if (GNUNET_OK !=
3637 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3638 {
3639 GNUNET_break (0);
3641 return;
3642 }
3643
3644#if MEASURE_PERFORMANCE
3645 perf_store.inquery.received += 1;
3646 perf_store.inquery.received_var_bytes += (ntohs (msg->header.size)
3647 - sizeof(struct InquiryMessage));
3648#endif
3649
3651 "Received union inquiry\n");
3652 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3653 / sizeof(struct IBF_Key);
3654 ibf_key = (const struct IBF_Key *) &msg[1];
3655 {
3657 struct GNUNET_HashContext *hashed_key_context =
3659 struct GNUNET_HashCode *hashed_key = GNUNET_new (struct GNUNET_HashCode);
3661 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
3662 &ibf_key,
3663 sizeof(struct IBF_Key));
3664 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
3665 hashed_key);
3666 GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
3667 hashed_key,
3668 &mcfs,
3670 );
3671
3672 while (0 != num_keys--)
3673 {
3674 struct IBF_Key unsalted_key;
3675 unsalt_key (ibf_key,
3676 ntohl (msg->salt),
3677 &unsalted_key);
3679 unsalted_key);
3680 ibf_key++;
3681 }
3682 GNUNET_CADET_receive_done (op->channel);
3683 }
3684}
3685
3686
3697static int
3699 uint32_t key,
3700 void *value)
3701{
3702 struct Operation *op = cls;
3703 struct KeyEntry *ke = value;
3704 struct GNUNET_MQ_Envelope *ev;
3705 struct GNUNET_SETU_ElementMessage *emsg;
3706 struct ElementEntry *ee = ke->element;
3707
3708 if (GNUNET_YES == ke->received)
3709 return GNUNET_YES;
3710#if MEASURE_PERFORMANCE
3711 perf_store.element_full.received += 1;
3712#endif
3713 ev = GNUNET_MQ_msg_extra (emsg,
3714 ee->element.size,
3716 GNUNET_memcpy (&emsg[1],
3717 ee->element.data,
3718 ee->element.size);
3719 emsg->element_type = htons (ee->element.element_type);
3720 GNUNET_MQ_send (op->mq,
3721 ev);
3722 return GNUNET_YES;
3723}
3724
3725
3732static int
3734 const struct TransmitFullMessage *mh)
3735{
3736 return GNUNET_OK;
3737}
3738
3739
3740static void
3742 const struct TransmitFullMessage *msg)
3743{
3744 struct Operation *op = cls;
3745 uint64_t avg_element_size = 0;
3746 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
3747
3751 if (GNUNET_OK !=
3752 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3753 {
3754 GNUNET_break (0);
3756 return;
3757 }
3758
3759 op->remote_element_count = ntohl (msg->remote_set_size);
3760 op->remote_set_diff = ntohl (msg->remote_set_difference);
3761 op->local_set_diff = ntohl (msg->local_set_difference);
3762
3763
3765 {
3767 "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
3768 "criteria\n");
3769 GNUNET_break_op (0);
3771 return;
3772 }
3773
3774#if MEASURE_PERFORMANCE
3775 perf_store.request_full.received += 1;
3776#endif
3777
3779 "Received request for full set transmission\n");
3780
3782 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
3783 op->set->content->elements);
3784 if (0 < op->local_element_count)
3785 {
3786 op->total_elements_size_local = 0;
3787 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
3788 &
3790 op);
3791 avg_element_size = op->total_elements_size_local / op->local_element_count;
3792 }
3793 {
3794 int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
3795 op->
3797 op->
3799 op->local_set_diff,
3800 op->remote_set_diff
3801 ,
3802 op->
3804 op->
3807 {
3809 "PROTOCOL VIOLATION: Remote peer choose to request the full set first but correct mode would have been"
3810 " : %d\n", mode_of_operation);
3811 GNUNET_break_op (0);
3813 return;
3814 }
3815 }
3816 // FIXME: we need to check that our set is larger than the
3817 // byzantine_lower_bound by some threshold
3818 send_full_set (op);
3819 GNUNET_CADET_receive_done (op->channel);
3820}
3821
3822
3829static void
3831 const struct GNUNET_MessageHeader *mh)
3832{
3833 struct Operation *op = cls;
3834
3838 uint8_t allowed_phases[] = {PHASE_FULL_SENDING, PHASE_FULL_RECEIVING};
3839 if (GNUNET_OK !=
3840 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3841 {
3842 GNUNET_break (0);
3844 return;
3845 }
3846
3847#if MEASURE_PERFORMANCE
3848 perf_store.full_done.received += 1;
3849#endif
3850
3851 switch (op->phase)
3852 {
3854 {
3855 struct GNUNET_MQ_Envelope *ev;
3856
3857 if ((GNUNET_YES == op->byzantine) &&
3858 (op->received_total != op->remote_element_count) )
3859 {
3860 /* The other peer gave not enough elements before sending full done, there's something wrong. */
3862 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
3863 (unsigned long long) op->received_total,
3864 (unsigned long long) op->remote_element_count);
3865 GNUNET_break_op (0);
3867 return;
3868 }
3869
3871 "got FULL DONE, sending elements that other peer is missing\n");
3872
3873 /* send all the elements that did not come from the remote peer */
3876 op);
3877#if MEASURE_PERFORMANCE
3878 perf_store.full_done.sent += 1;
3879#endif
3881 GNUNET_MQ_send (op->mq,
3882 ev);
3883 op->phase = PHASE_FINISHED;
3884 /* we now wait until the other peer sends us the OVER message*/
3885 }
3886 break;
3887
3888 case PHASE_FULL_SENDING:
3889 {
3891 "got FULL DONE, finishing\n");
3892 /* We sent the full set, and got the response for that. We're done. */
3893 op->phase = PHASE_FINISHED;
3894 GNUNET_CADET_receive_done (op->channel);
3897 return;
3898 }
3899
3900 default:
3902 "Handle full done phase is %u\n",
3903 (unsigned) op->phase);
3904 GNUNET_break_op (0);
3906 return;
3907 }
3908 GNUNET_CADET_receive_done (op->channel);
3909}
3910
3911
3920static int
3922 const struct GNUNET_MessageHeader *mh)
3923{
3924 struct Operation *op = cls;
3925 unsigned int num_hashes;
3926
3927 (void) op;
3928 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3929 / sizeof(struct GNUNET_HashCode);
3930 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3931 != num_hashes * sizeof(struct GNUNET_HashCode))
3932 {
3933 GNUNET_break_op (0);
3934 return GNUNET_SYSERR;
3935 }
3936 return GNUNET_OK;
3937}
3938
3939
3947static void
3949 const struct GNUNET_MessageHeader *mh)
3950{
3951 struct Operation *op = cls;
3952 struct ElementEntry *ee;
3953 struct GNUNET_SETU_ElementMessage *emsg;
3954 const struct GNUNET_HashCode *hash;
3955 unsigned int num_hashes;
3956 struct GNUNET_MQ_Envelope *ev;
3957
3961 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3963 if (GNUNET_OK !=
3964 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3965 {
3966 GNUNET_break (0);
3968 return;
3969 }
3970#if MEASURE_PERFORMANCE
3971 perf_store.demand.received += 1;
3972 perf_store.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct
3974#endif
3975
3976 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3977 / sizeof(struct GNUNET_HashCode);
3978 for (hash = (const struct GNUNET_HashCode *) &mh[1];
3979 num_hashes > 0;
3980 hash++, num_hashes--)
3981 {
3982 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
3983 hash);
3984 if (NULL == ee)
3985 {
3986 /* Demand for non-existing element. */
3987 GNUNET_break_op (0);
3989 return;
3990 }
3991
3992 /* Save send demand message for message control */
3993 if (GNUNET_YES !=
3995 op->message_control_flow,
3997 &ee->element_hash,
3999 )
4000 {
4002 "Double demand message received found!\n");
4003 GNUNET_break (0);
4005 return;
4006 }
4007 ;
4008
4009 /* Mark element to be expected to received */
4010 if (GNUNET_YES !=
4012 op->message_control_flow,
4014 &ee->element_hash,
4016 )
4017 {
4019 "Double element message sent found!\n");
4020 GNUNET_break (0);
4022 return;
4023 }
4025 {
4026 /* Probably confused lazily copied sets. */
4027 GNUNET_break_op (0);
4029 return;
4030 }
4031#if MEASURE_PERFORMANCE
4032 perf_store.element.sent += 1;
4033 perf_store.element.sent_var_bytes += ee->element.size;
4034#endif
4035 ev = GNUNET_MQ_msg_extra (emsg,
4036 ee->element.size,
4038 GNUNET_memcpy (&emsg[1],
4039 ee->element.data,
4040 ee->element.size);
4041 emsg->reserved = htons (0);
4042 emsg->element_type = htons (ee->element.element_type);
4044 "[OP %p] Sending demanded element (size %u, hash %s) to peer\n",
4045 op,
4046 (unsigned int) ee->element.size,
4047 GNUNET_h2s (&ee->element_hash));
4048 GNUNET_MQ_send (op->mq, ev);
4050 "# exchanged elements",
4051 1,
4052 GNUNET_NO);
4053 if (op->symmetric)
4055 &ee->element,
4057 }
4058 GNUNET_CADET_receive_done (op->channel);
4059 maybe_finish (op);
4060}
4061
4062
4070static int
4072 const struct GNUNET_MessageHeader *mh)
4073{
4074 struct Operation *op = cls;
4075 unsigned int num_hashes;
4076
4077 /* look up elements and send them */
4078 if ((op->phase != PHASE_PASSIVE_DECODING) &&
4079 (op->phase != PHASE_ACTIVE_DECODING))
4080 {
4081 GNUNET_break_op (0);
4082 return GNUNET_SYSERR;
4083 }
4084 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
4085 / sizeof(struct GNUNET_HashCode);
4086 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
4087 num_hashes * sizeof(struct GNUNET_HashCode))
4088 {
4089 GNUNET_break_op (0);
4090 return GNUNET_SYSERR;
4091 }
4092 return GNUNET_OK;
4093}
4094
4095
4103static void
4105 const struct GNUNET_MessageHeader *mh)
4106{
4107 struct Operation *op = cls;
4108 const struct GNUNET_HashCode *hash;
4109 unsigned int num_hashes;
4113 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4114 if (GNUNET_OK !=
4115 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4116 {
4117 GNUNET_break (0);
4119 return;
4120 }
4121
4122#if MEASURE_PERFORMANCE
4123 perf_store.offer.received += 1;
4124 perf_store.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct
4126#endif
4127
4128 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
4129 / sizeof(struct GNUNET_HashCode);
4130 for (hash = (const struct GNUNET_HashCode *) &mh[1];
4131 num_hashes > 0;
4132 hash++, num_hashes--)
4133 {
4134 struct ElementEntry *ee;
4135 struct GNUNET_MessageHeader *demands;
4136 struct GNUNET_MQ_Envelope *ev;
4137
4138 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
4139 hash);
4140 if (NULL != ee)
4142 continue;
4143
4144 if (GNUNET_YES ==
4146 hash))
4147 {
4149 "Skipped sending duplicate demand\n");
4150 continue;
4151 }
4152
4155 op->demanded_hashes,
4156 hash,
4157 NULL,
4159
4161 "[OP %p] Requesting element (hash %s)\n",
4162 op, GNUNET_h2s (hash));
4163
4164#if MEASURE_PERFORMANCE
4165 perf_store.demand.sent += 1;
4166 perf_store.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode);
4167#endif
4168 /* Save send demand message for message control */
4169 if (GNUNET_YES !=
4171 op->message_control_flow,
4173 hash,
4175 {
4177 "Double demand message sent found!\n");
4178 GNUNET_break (0);
4180 return;
4181 }
4182
4183 /* Mark offer as received received */
4184 if (GNUNET_YES !=
4186 op->message_control_flow,
4188 hash,
4190 {
4192 "Double offer message received found!\n");
4193 GNUNET_break (0);
4195 return;
4196 }
4197 /* Mark element to be expected to received */
4198 if (GNUNET_YES !=
4200 op->message_control_flow,
4202 hash,
4204 {
4206 "Element already expected!\n");
4207 GNUNET_break (0);
4209 return;
4210 }
4211 ev = GNUNET_MQ_msg_header_extra (demands,
4212 sizeof(struct GNUNET_HashCode),
4214 GNUNET_memcpy (&demands[1],
4215 hash,
4216 sizeof(struct GNUNET_HashCode));
4217 GNUNET_MQ_send (op->mq, ev);
4218 }
4219 GNUNET_CADET_receive_done (op->channel);
4220}
4221
4222
4229static void
4231 const struct GNUNET_MessageHeader *mh)
4232{
4233 struct Operation *op = cls;
4234
4238 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4239 if (GNUNET_OK !=
4240 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4241 {
4242 GNUNET_break (0);
4244 return;
4245 }
4246
4247 if (op->active_passive_switch_required)
4248 {
4250 "PROTOCOL VIOLATION: Received done but role change is necessary\n");
4251 GNUNET_break (0);
4253 return;
4254 }
4255
4256#if MEASURE_PERFORMANCE
4257 perf_store.done.received += 1;
4258#endif
4259 switch (op->phase)
4260 {
4262 /* We got all requests, but still have to send our elements in response. */
4263 op->phase = PHASE_FINISH_WAITING;
4265 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
4266 /* The active peer is done sending offers
4267 * and inquiries. This means that all
4268 * our responses to that (demands and offers)
4269 * must be in flight (queued or in mesh).
4270 *
4271 * We should notify the active peer once
4272 * all our demands are satisfied, so that the active
4273 * peer can quit if we gave it everything.
4274 */GNUNET_CADET_receive_done (op->channel);
4275 maybe_finish (op);
4276 return;
4279 "got DONE (as active partner), waiting to finish\n");
4280 /* All demands of the other peer are satisfied,
4281 * and we processed all offers, thus we know
4282 * exactly what our demands must be.
4283 *
4284 * We'll close the channel
4285 * to the other peer once our demands are met.
4286 */op->phase = PHASE_FINISH_CLOSING;
4287 GNUNET_CADET_receive_done (op->channel);
4288 maybe_finish (op);
4289 return;
4290 default:
4291 GNUNET_break_op (0);
4293 return;
4294 }
4295}
4296
4297
4304static void
4306 const struct GNUNET_MessageHeader *mh)
4307{
4308#if MEASURE_PERFORMANCE
4309 perf_store.over.received += 1;
4310#endif
4311 send_client_done (cls);
4312}
4313
4314
4323static struct Operation *
4324get_incoming (uint32_t id)
4325{
4326 for (struct Listener *listener = listener_head;
4327 NULL != listener;
4329 {
4330 for (struct Operation *op = listener->op_head;
4331 NULL != op;
4332 op = op->next)
4333 if (op->suggest_id == id)
4334 return op;
4335 }
4336 return NULL;
4337}
4338
4339
4348static void *
4350 struct GNUNET_SERVICE_Client *c,
4351 struct GNUNET_MQ_Handle *mq)
4352{
4353 struct ClientState *cs;
4354
4355 num_clients++;
4356 cs = GNUNET_new (struct ClientState);
4357 cs->client = c;
4358 cs->mq = mq;
4359 return cs;
4360}
4361
4362
4371static int
4373 const struct GNUNET_HashCode *key,
4374 void *value)
4375{
4376 struct ElementEntry *ee = value;
4377
4378 GNUNET_free (ee);
4379 return GNUNET_YES;
4380}
4381
4382
4390static void
4392 struct GNUNET_SERVICE_Client *client,
4393 void *internal_cls)
4394{
4395 struct ClientState *cs = internal_cls;
4396 struct Operation *op;
4397 struct Listener *listener;
4398 struct Set *set;
4399
4401 "Client disconnected, cleaning up\n");
4402 if (NULL != (set = cs->set))
4403 {
4404 struct SetContent *content = set->content;
4405
4407 "Destroying client's set\n");
4408 /* Destroy pending set operations */
4409 while (NULL != set->ops_head)
4411
4412 /* Destroy operation-specific state */
4413 if (NULL != set->se)
4414 {
4416 set->se = NULL;
4417 }
4418 /* free set content (or at least decrement RC) */
4419 set->content = NULL;
4420 GNUNET_assert (0 != content->refcount);
4421 content->refcount--;
4422 if (0 == content->refcount)
4423 {
4424 GNUNET_assert (NULL != content->elements);
4427 NULL);
4429 content->elements = NULL;
4430 GNUNET_free (content);
4431 }
4432 GNUNET_free (set);
4433 }
4434
4435 if (NULL != (listener = cs->listener))
4436 {
4438 "Destroying client's listener\n");
4440 listener->open_port = NULL;
4441 while (NULL != (op = listener->op_head))
4442 {
4444 "Destroying incoming operation `%u' from peer `%s'\n",
4445 (unsigned int) op->client_request_id,
4446 GNUNET_i2s (&op->peer));
4448 }
4451 listener);
4452 GNUNET_free (listener);
4453 }
4454 GNUNET_free (cs);
4455 num_clients--;
4456 if ( (GNUNET_YES == in_shutdown) &&
4457 (0 == num_clients) )
4458 {
4459 if (NULL != cadet)
4460 {
4462 cadet = NULL;
4463 }
4464 }
4465}
4466
4467
4476static int
4478 const struct OperationRequestMessage *msg)
4479{
4480 struct Operation *op = cls;
4481 struct Listener *listener = op->listener;
4482 const struct GNUNET_MessageHeader *nested_context;
4483
4484 /* double operation request */
4485 if (0 != op->suggest_id)
4486 {
4487 GNUNET_break_op (0);
4488 return GNUNET_SYSERR;
4489 }
4490 /* This should be equivalent to the previous condition, but can't hurt to check twice */
4491 if (NULL == listener)
4492 {
4493 GNUNET_break (0);
4494 return GNUNET_SYSERR;
4495 }
4496 nested_context = GNUNET_MQ_extract_nested_mh (msg);
4497 if ((NULL != nested_context) &&
4498 (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
4499 {
4500 GNUNET_break_op (0);
4501 return GNUNET_SYSERR;
4502 }
4503 return GNUNET_OK;
4504}
4505
4506
4522static void
4524 const struct OperationRequestMessage *msg)
4525{
4526 struct Operation *op = cls;
4527 struct Listener *listener = op->listener;
4528 const struct GNUNET_MessageHeader *nested_context;
4529 struct GNUNET_MQ_Envelope *env;
4530 struct GNUNET_SETU_RequestMessage *cmsg;
4531
4532 nested_context = GNUNET_MQ_extract_nested_mh (msg);
4533 /* Make a copy of the nested_context (application-specific context
4534 information that is opaque to set) so we can pass it to the
4535 listener later on */
4536 if (NULL != nested_context)
4537 op->context_msg = GNUNET_copy_message (nested_context);
4538 op->remote_element_count = ntohl (msg->element_count);
4539 GNUNET_log (
4541 "Received P2P operation request (port %s) for active listener\n",
4542 GNUNET_h2s (&op->listener->app_id));
4543 GNUNET_assert (0 == op->suggest_id);
4544 if (0 == suggest_id)
4545 suggest_id++;
4546 op->suggest_id = suggest_id++;
4547 GNUNET_assert (NULL != op->timeout_task);
4548 GNUNET_SCHEDULER_cancel (op->timeout_task);
4549 op->timeout_task = NULL;
4552 op->context_msg);
4553 GNUNET_log (
4555 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
4556 op->suggest_id,
4557 listener,
4558 listener->cs);
4559 cmsg->accept_id = htonl (op->suggest_id);
4560 cmsg->peer_id = op->peer;
4561 GNUNET_MQ_send (listener->cs->mq,
4562 env);
4563 /* NOTE: GNUNET_CADET_receive_done() will be called in
4564 #handle_client_accept() */
4565}
4566
4567
4576static void
4578 const struct GNUNET_SETU_CreateMessage *msg)
4579{
4580 struct ClientState *cs = cls;
4581 struct Set *set;
4582
4584 "Client created new set for union operation\n");
4585 if (NULL != cs->set)
4586 {
4587 /* There can only be one set per client */
4588 GNUNET_break (0);
4590 return;
4591 }
4592 set = GNUNET_new (struct Set);
4593 {
4594 struct MultiStrataEstimator *se;
4595
4599 if (NULL == se)
4600 {
4602 "Failed to allocate strata estimator\n");
4603 GNUNET_free (set);
4605 return;
4606 }
4607 set->se = se;
4608 }
4609 set->content = GNUNET_new (struct SetContent);
4610 set->content->refcount = 1;
4612 GNUNET_YES);
4613 set->cs = cs;
4614 cs->set = set;
4616}
4617
4618
4628static void
4630{
4631 struct Operation *op = cls;
4632
4633 op->timeout_task = NULL;
4635 "Remote peer's incoming request timed out\n");
4637}
4638
4639
4656static void *
4659 const struct GNUNET_PeerIdentity *source)
4660{
4661 struct Listener *listener = cls;
4662 struct Operation *op;
4663
4665 "New incoming channel\n");
4666 op = GNUNET_new (struct Operation);
4667 op->listener = listener;
4668 op->peer = *source;
4669 op->channel = channel;
4670 op->mq = GNUNET_CADET_get_mq (op->channel);
4671 op->salt = GNUNET_CRYPTO_random_u32 (UINT32_MAX);
4674 op);
4677 op);
4678 return op;
4679}
4680
4681
4698static void
4699channel_end_cb (void *channel_ctx,
4700 const struct GNUNET_CADET_Channel *channel)
4701{
4702 struct Operation *op = channel_ctx;
4703
4704 op->channel = NULL;
4706}
4707
4708
4723static void
4725 const struct GNUNET_CADET_Channel *channel,
4726 int window_size)
4727{
4728 /* FIXME: not implemented, we could do flow control here... */
4729}
4730
4731
4739static void
4741 const struct GNUNET_SETU_ListenMessage *msg)
4742{
4743 struct ClientState *cs = cls;
4744 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4745 GNUNET_MQ_hd_var_size (incoming_msg,
4748 NULL),
4749 GNUNET_MQ_hd_var_size (union_p2p_ibf,
4751 struct IBFMessage,
4752 NULL),
4753 GNUNET_MQ_hd_var_size (union_p2p_elements,
4756 NULL),
4757 GNUNET_MQ_hd_var_size (union_p2p_offer,
4759 struct GNUNET_MessageHeader,
4760 NULL),
4761 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
4763 struct InquiryMessage,
4764 NULL),
4765 GNUNET_MQ_hd_var_size (union_p2p_demand,
4767 struct GNUNET_MessageHeader,
4768 NULL),
4769 GNUNET_MQ_hd_fixed_size (union_p2p_done,
4771 struct GNUNET_MessageHeader,
4772 NULL),
4773 GNUNET_MQ_hd_fixed_size (union_p2p_over,
4775 struct GNUNET_MessageHeader,
4776 NULL),
4777 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
4779 struct GNUNET_MessageHeader,
4780 NULL),
4781 GNUNET_MQ_hd_var_size (union_p2p_request_full,
4783 struct TransmitFullMessage,
4784 NULL),
4785 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4788 NULL),
4789 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4792 NULL),
4793 GNUNET_MQ_hd_var_size (union_p2p_full_element,
4796 NULL),
4797 GNUNET_MQ_hd_var_size (union_p2p_send_full,
4799 struct TransmitFullMessage,
4800 NULL),
4802 };
4803 struct Listener *listener;
4804
4805 if (NULL != cs->listener)
4806 {
4807 /* max. one active listener per client! */
4808 GNUNET_break (0);
4810 return;
4811 }
4812 listener = GNUNET_new (struct Listener);
4813 listener->cs = cs;
4814 cs->listener = listener;
4815 listener->app_id = msg->app_id;
4818 listener);
4820 "New listener created (port %s)\n",
4821 GNUNET_h2s (&listener->app_id));
4823 &msg->app_id,
4825 listener,
4828 cadet_handlers);
4830}
4831
4832
4840static void
4842 const struct GNUNET_SETU_RejectMessage *msg)
4843{
4844 struct ClientState *cs = cls;
4845 struct Operation *op;
4846
4847 op = get_incoming (ntohl (msg->accept_reject_id));
4848 if (NULL == op)
4849 {
4850 /* no matching incoming operation for this reject;
4851 could be that the other peer already disconnected... */
4853 "Client rejected unknown operation %u\n",
4854 (unsigned int) ntohl (msg->accept_reject_id));
4856 return;
4857 }
4859 "Peer request (app %s) rejected by client\n",
4860 GNUNET_h2s (&cs->listener->app_id));
4863}
4864
4865
4872static int
4874 const struct GNUNET_SETU_ElementMessage *msg)
4875{
4876 /* NOTE: Technically, we should probably check with the
4877 block library whether the element we are given is well-formed */
4878 return GNUNET_OK;
4879}
4880
4881
4888static void
4890 const struct GNUNET_SETU_ElementMessage *msg)
4891{
4892 struct ClientState *cs = cls;
4893 struct Set *set;
4894 struct GNUNET_SETU_Element el;
4895 struct ElementEntry *ee;
4896 struct GNUNET_HashCode hash;
4897
4898 if (NULL == (set = cs->set))
4899 {
4900 /* client without a set requested an operation */
4901 GNUNET_break (0);
4903 return;
4904 }
4906 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
4907 el.size = ntohs (msg->header.size) - sizeof(*msg);
4908 el.data = &msg[1];
4909 el.element_type = ntohs (msg->element_type);
4911 &hash);
4913 &hash);
4914 if (NULL == ee)
4915 {
4917 "Client inserts element %s of size %u\n",
4918 GNUNET_h2s (&hash),
4919 el.size);
4920 ee = GNUNET_malloc (el.size + sizeof(*ee));
4921 ee->element.size = el.size;
4922 GNUNET_memcpy (&ee[1], el.data, el.size);
4923 ee->element.data = &ee[1];
4924 ee->element.element_type = el.element_type;
4925 ee->remote = GNUNET_NO;
4926 ee->generation = set->current_generation;
4927 ee->element_hash = hash;
4930 set->content->elements,
4931 &ee->element_hash,
4932 ee,
4934 }
4935 else
4936 {
4938 "Client inserted element %s of size %u twice (ignored)\n",
4939 GNUNET_h2s (&hash),
4940 el.size);
4941 /* same element inserted twice */
4942 return;
4943 }
4945 get_ibf_key (&ee->element_hash));
4946}
4947
4948
4955static void
4957{
4958 set->content->latest_generation++;
4959 set->current_generation++;
4960}
4961
4962
4972static int
4974 const struct GNUNET_SETU_EvaluateMessage *msg)
4975{
4976 /* FIXME: suboptimal, even if the context below could be NULL,
4977 there are malformed messages this does not check for... */
4978 return GNUNET_OK;
4979}
4980
4981
4990static void
4992 const struct GNUNET_SETU_EvaluateMessage *msg)
4993{
4994 struct ClientState *cs = cls;
4995 struct Operation *op = GNUNET_new (struct Operation);
4996
4997 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4998 GNUNET_MQ_hd_var_size (incoming_msg,
5001 op),
5002 GNUNET_MQ_hd_var_size (union_p2p_ibf,
5004 struct IBFMessage,
5005 op),
5006 GNUNET_MQ_hd_var_size (union_p2p_elements,
5009 op),
5010 GNUNET_MQ_hd_var_size (union_p2p_offer,
5012 struct GNUNET_MessageHeader,
5013 op),
5014 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
5016 struct InquiryMessage,
5017 op),
5018 GNUNET_MQ_hd_var_size (union_p2p_demand,
5020 struct GNUNET_MessageHeader,
5021 op),
5022 GNUNET_MQ_hd_fixed_size (union_p2p_done,
5024 struct GNUNET_MessageHeader,
5025 op),
5026 GNUNET_MQ_hd_fixed_size (union_p2p_over,
5028 struct GNUNET_MessageHeader,
5029 op),
5030 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
5032 struct GNUNET_MessageHeader,
5033 op),
5034 GNUNET_MQ_hd_var_size (union_p2p_request_full,
5036 struct TransmitFullMessage,
5037 op),
5038 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
5041 op),
5042 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
5045 op),
5046 GNUNET_MQ_hd_var_size (union_p2p_full_element,
5049 op),
5050 GNUNET_MQ_hd_var_size (union_p2p_send_full,
5052 struct TransmitFullMessage,
5053 NULL),
5055 };
5056 struct Set *set;
5057 const struct GNUNET_MessageHeader *context;
5058
5059 if (NULL == (set = cs->set))
5060 {
5061 GNUNET_break (0);
5062 GNUNET_free (op);
5064 return;
5065 }
5066 op->salt = GNUNET_CRYPTO_random_u32 (UINT32_MAX);
5067 op->peer = msg->target_peer;
5068 op->client_request_id = ntohl (msg->request_id);
5069 op->byzantine = msg->byzantine;
5070 op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
5071 op->force_full = msg->force_full;
5072 op->force_delta = msg->force_delta;
5073 op->symmetric = msg->symmetric;
5074 op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5075 op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5076 op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5077 op->byzantine_upper_bound = msg->byzantine_upper_bond;
5078 op->active_passive_switch_required = false;
5080
5081 /* create hashmap for message control */
5082 op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5083 GNUNET_NO);
5084 op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5085
5086#if MEASURE_PERFORMANCE
5087 /* load config */
5088 load_config (op);
5089#endif
5090
5091 /* Advance generation values, so that
5092 mutations won't interfere with the running operation. */
5093 op->set = set;
5094 op->generation_created = set->current_generation;
5095 advance_generation (set);
5097 set->ops_tail,
5098 op);
5100 "Creating new CADET channel to port %s for set union\n",
5101 GNUNET_h2s (&msg->app_id));
5103 op,
5104 &msg->target_peer,
5105 &msg->app_id,
5108 cadet_handlers);
5109 op->mq = GNUNET_CADET_get_mq (op->channel);
5110 {
5111 struct GNUNET_MQ_Envelope *ev;
5112 struct OperationRequestMessage *msg_tmp;
5113
5114#if MEASURE_PERFORMANCE
5115 perf_store.operation_request.sent += 1;
5116#endif
5117 ev = GNUNET_MQ_msg_nested_mh (msg_tmp,
5119 context);
5120 if (NULL == ev)
5121 {
5122 /* the context message is too large */
5123 GNUNET_break (0);
5125 return;
5126 }
5127 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
5128 GNUNET_NO);
5129 /* copy the current generation's strata estimator for this operation */
5130 op->se = strata_estimator_dup (op->set->se);
5131 /* we started the operation, thus we have to send the operation request */
5132 op->phase = PHASE_EXPECT_SE;
5133
5134 op->salt_receive = (op->peer_site + 1) % 2;
5135 op->salt_send = op->peer_site; // FIXME?????
5136
5137
5139 "Initiating union operation evaluation\n");
5141 "# of total union operations",
5142 1,
5143 GNUNET_NO);
5145 "# of initiated union operations",
5146 1,
5147 GNUNET_NO);
5148 GNUNET_MQ_send (op->mq,
5149 ev);
5150 if (NULL != context)
5152 "sent op request with context message\n");
5153 else
5155 "sent op request without context message\n");
5158 op->key_to_element);
5159
5160 }
5162}
5163
5164
5171static void
5173 const struct GNUNET_SETU_CancelMessage *msg)
5174{
5175 struct ClientState *cs = cls;
5176 struct Set *set;
5177 struct Operation *op;
5178 int found;
5179
5180 if (NULL == (set = cs->set))
5181 {
5182 /* client without a set requested an operation */
5183 GNUNET_break (0);
5185 return;
5186 }
5187 found = GNUNET_NO;
5188 for (op = set->ops_head; NULL != op; op = op->next)
5189 {
5190 if (op->client_request_id == ntohl (msg->request_id))
5191 {
5192 found = GNUNET_YES;
5193 break;
5194 }
5195 }
5196 if (GNUNET_NO == found)
5197 {
5198 /* It may happen that the operation was already destroyed due to
5199 * the other peer disconnecting. The client may not know about this
5200 * yet and try to cancel the (just barely non-existent) operation.
5201 * So this is not a hard error.
5202 *///
5204 "Client canceled non-existent op %u\n",
5205 (uint32_t) ntohl (msg->request_id));
5206 }
5207 else
5208 {
5210 "Client requested cancel for op %u\n",
5211 (uint32_t) ntohl (msg->request_id));
5213 }
5215}
5216
5217
5226static void
5228 const struct GNUNET_SETU_AcceptMessage *msg)
5229{
5230 struct ClientState *cs = cls;
5231 struct Set *set;
5232 struct Operation *op;
5233 struct GNUNET_SETU_ResultMessage *result_message;
5234 struct GNUNET_MQ_Envelope *ev;
5235 struct Listener *listener;
5236
5237 if (NULL == (set = cs->set))
5238 {
5239 /* client without a set requested to accept */
5240 GNUNET_break (0);
5242 return;
5243 }
5244 op = get_incoming (ntohl (msg->accept_reject_id));
5245 if (NULL == op)
5246 {
5247 /* It is not an error if the set op does not exist -- it may
5248 * have been destroyed when the partner peer disconnected. */
5249 GNUNET_log (
5251 "Client %p accepted request %u of listener %p that is no longer active\n",
5252 cs,
5253 ntohl (msg->accept_reject_id),
5254 cs->listener);
5255 ev = GNUNET_MQ_msg (result_message,
5257 result_message->request_id = msg->request_id;
5258 result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
5259 GNUNET_MQ_send (set->cs->mq, ev);
5261 return;
5262 }
5264 "Client accepting request %u\n",
5265 (uint32_t) ntohl (msg->accept_reject_id));
5266 listener = op->listener;
5267 op->listener = NULL;
5269 listener->op_tail,
5270 op);
5271 op->set = set;
5273 set->ops_tail,
5274 op);
5275 op->client_request_id = ntohl (msg->request_id);
5276 op->byzantine = msg->byzantine;
5277 op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
5278 op->force_full = msg->force_full;
5279 op->force_delta = msg->force_delta;
5280 op->symmetric = msg->symmetric;
5281 op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5282 op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5283 op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5284 op->byzantine_upper_bound = msg->byzantine_upper_bond;
5285 op->active_passive_switch_required = false;
5286 /* create hashmap for message control */
5287 op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5288 GNUNET_NO);
5289 op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5290
5291#if MEASURE_PERFORMANCE
5292 /* load config */
5293 load_config (op);
5294#endif
5295
5296 /* Advance generation values, so that future mutations do not
5297 interfere with the running operation. */
5298 op->generation_created = set->current_generation;
5299 advance_generation (set);
5300 GNUNET_assert (NULL == op->se);
5301
5303 "accepting set union operation\n");
5305 "# of accepted union operations",
5306 1,
5307 GNUNET_NO);
5309 "# of total union operations",
5310 1,
5311 GNUNET_NO);
5312 {
5313 struct MultiStrataEstimator *se;
5314 struct GNUNET_MQ_Envelope *ev_tmp;
5315 struct StrataEstimatorMessage *strata_msg;
5316 char *buf;
5317 size_t len;
5318 uint16_t type;
5319
5320 op->se = strata_estimator_dup (op->set->se);
5321 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
5322 GNUNET_NO);
5323 op->salt_receive = (op->peer_site + 1) % 2;
5324 op->salt_send = op->peer_site; // FIXME?????
5327 op->key_to_element);
5328
5329 /* kick off the operation */
5330 se = op->se;
5331
5332 {
5333 uint8_t se_count = 1;
5334 if (op->initial_size > 0)
5335 {
5336 op->total_elements_size_local = 0;
5337 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
5338 &
5340 op);
5342 op->total_elements_size_local / op->initial_size,
5343 op->initial_size);
5344 }
5346 * ((SE_IBFS_TOTAL_SIZE / 8) * se_count));
5347 len = strata_estimator_write (se,
5349 se_count,
5350 buf);
5351#if MEASURE_PERFORMANCE
5352 perf_store.se.sent += 1;
5353 perf_store.se.sent_var_bytes += len;
5354#endif
5355
5356 if (len < se->stratas[0]->strata_count * IBF_BUCKET_SIZE
5359 else
5361 ev_tmp = GNUNET_MQ_msg_extra (strata_msg,
5362 len,
5363 type);
5364 GNUNET_memcpy (&strata_msg[1],
5365 buf,
5366 len);
5367 GNUNET_free (buf);
5368 strata_msg->set_size
5370 op->set->content->elements));
5371 strata_msg->se_count = se_count;
5372 GNUNET_MQ_send (op->mq,
5373 ev_tmp);
5374 op->phase = PHASE_EXPECT_IBF;
5375 }
5376 }
5377 /* Now allow CADET to continue, as we did not do this in
5378 #handle_incoming_msg (as we wanted to first see if the
5379 local client would accept the request). */
5380 GNUNET_CADET_receive_done (op->channel);
5382}
5383
5384
5390static void
5391shutdown_task (void *cls)
5392{
5393 /* Delay actual shutdown to allow service to disconnect clients */
5395 if (0 == num_clients)
5396 {
5397 if (NULL != cadet)
5398 {
5400 cadet = NULL;
5401 }
5402 }
5404 GNUNET_YES);
5406 "handled shutdown request\n");
5407#if MEASURE_PERFORMANCE
5408 calculate_perf_store ();
5409#endif
5410}
5411
5412
5421static void
5422run (void *cls,
5423 const struct GNUNET_CONFIGURATION_Handle *cfg,
5425{
5426 /* FIXME: need to modify SERVICE (!) API to allow
5427 us to run a shutdown task *after* clients were
5428 forcefully disconnected! */
5430 NULL);
5432 cfg);
5434 if (NULL == cadet)
5435 {
5437 _ ("Could not connect to CADET service\n"));
5439 return;
5440 }
5441}
5442
5443
5449 "set",
5451 &run,
5454 NULL,
5455 GNUNET_MQ_hd_fixed_size (client_accept,
5458 NULL),
5459 GNUNET_MQ_hd_var_size (client_set_add,
5462 NULL),
5463 GNUNET_MQ_hd_fixed_size (client_create_set,
5466 NULL),
5467 GNUNET_MQ_hd_var_size (client_evaluate,
5470 NULL),
5471 GNUNET_MQ_hd_fixed_size (client_listen,
5474 NULL),
5475 GNUNET_MQ_hd_fixed_size (client_reject,
5478 NULL),
5479 GNUNET_MQ_hd_fixed_size (client_cancel,
5482 NULL),
5484
5485
5486/* end of gnunet-service-setu.c */
struct GNUNET_MessageHeader * msg
Definition 005.c:2
struct GNUNET_MQ_Envelope * env
Definition 005.c:1
void ibf_subtract(struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2)
Subtract ibf2 from ibf1, storing the result in ibf1.
Definition ibf.c:357
int ibf_decode(struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *ret_id)
Decode and remove an element from the IBF, if possible.
Definition ibf.c:229
void ibf_read_slice(const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf)
Read buckets from a buffer into an ibf.
Definition ibf.c:324
void ibf_write_slice(const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf)
Write buckets from an ibf to a buffer.
Definition ibf.c:291
void ibf_insert(struct InvertibleBloomFilter *ibf, struct IBF_Key key)
Insert a key into an IBF.
Definition ibf.c:168
struct InvertibleBloomFilter * ibf_dup(const struct InvertibleBloomFilter *ibf)
Create a copy of an IBF, the copy has to be destroyed properly.
Definition ibf.c:380
struct InvertibleBloomFilter * ibf_create(uint32_t size, uint8_t hash_num)
Create an invertible bloom filter.
Definition ibf.c:80
void ibf_destroy(struct InvertibleBloomFilter *ibf)
Destroy all resources associated with the invertible bloom filter.
Definition ibf.c:404
#define IBF_BUCKET_SIZE
Size of one ibf bucket in bytes.
Definition ibf.h:72
char * getenv()
static struct GNUNET_ARM_Operation * op
Current operation.
Definition gnunet-arm.c:143
static int ret
Final status code.
Definition gnunet-arm.c:93
static struct GNUNET_CONFIGURATION_Handle * cfg
Our configuration.
Definition gnunet-arm.c:108
static unsigned int phase
Processing stage that we are in.
Definition gnunet-arm.c:113
static struct GNUNET_CADET_Handle * mh
Cadet handle.
struct GNUNET_HashCode key
The key used in the DHT.
static struct GNUNET_FS_Handle * ctx
static GstElement * source
Appsrc instance into which we write data for the pipeline.
static pa_context * context
Pulseaudio context.
struct GNUNET_SCHEDULER_Task * shutdown_task
static char * name
Name (label) of the records to list.
static char * res
Currently read line or NULL on EOF.
static char * value
Value of the record to add/remove.
static uint32_t type
Type string converted to DNS type value.
static int status
The program status; 0 for success.
Definition gnunet-nse.c:39
static struct GNUNET_IDENTITY_EgoLookup * el
Handle for our ego lookup.
static struct GNUNET_CRYPTO_PowSalt salt
Salt for PoW calculations.
static struct GNUNET_SERVICE_Handle * service
Handle to our service instance.
UnionOperationPhase
Current phase we are in for a union operation.
unsigned int strata_estimator_difference(const struct StrataEstimator *se1, const struct StrataEstimator *se2)
Estimate set difference with two strata estimators, i.e.
struct StrataEstimator * strata_estimator_create(unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum)
Create a new strata estimator with the given parameters.
void strata_estimator_destroy(struct StrataEstimator *se)
Destroy a strata estimator, free all of its resources.
int strata_estimator_read(const void *buf, size_t buf_len, int is_compressed, struct StrataEstimator *se)
Read strata from the buffer into the given strata estimator.
size_t strata_estimator_write(const struct StrataEstimator *se, void *buf)
Write the given strata estimator to the buffer.
void strata_estimator_insert(struct StrataEstimator *se, struct IBF_Key key)
Add a key to the strata estimator.
struct StrataEstimator * strata_estimator_dup(struct StrataEstimator *se)
Make a copy of a strata estimator.
static void _GSS_operation_destroy(struct Operation *op)
Destroy the given operation.
#define IBF_MIN_SIZE
Minimal size of an ibf Based on the bsc thesis of Elias Summermatter (2021)
static void _GSS_operation_destroy2(struct Operation *op)
This function probably should not exist and be replaced by inlining more specific logic in the variou...
static void handle_client_create_set(void *cls, const struct GNUNET_SETU_CreateMessage *msg)
Called when a client wants to create a new set.
static int _GSS_is_element_of_operation(struct ElementEntry *ee, struct Operation *op)
Is element ee part of the set used by op?
static int check_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
static unsigned int get_next_ibf_size(float ibf_bucket_number_factor, unsigned int decoded_elements, unsigned int last_ibf_size)
#define MAX_IBF_SIZE
The maximum size of an ibf we use is MAX_IBF_SIZE=2^20.
#define MAX_BUCKETS_PER_MESSAGE
Number of buckets that can be transmitted in one message.
static uint32_t suggest_id
Counter for allocating unique IDs for clients, used to identify incoming operation requests from remo...
static void handle_union_p2p_request_full(void *cls, const struct TransmitFullMessage *msg)
static int check_union_p2p_request_full(void *cls, const struct TransmitFullMessage *mh)
Handle a request for full set transmission.
static int create_randomized_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Create randomized element hashmap for full sending.
static int decode_and_send(struct Operation *op)
Decode which elements are missing on each side, and send the appropriate offers and inquiries.
static int send_full_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Send a set element.
MESSAGE_CONTROL_FLOW_STATE
Different states to control the messages flow in differential mode.
@ MSG_CFS_EXPECTED
Track that receiving this message is expected.
@ MSG_CFS_SENT
Track that a message has been sent.
@ MSG_CFS_UNINITIALIZED
Initial message state.
@ MSG_CFS_RECEIVED
Track that message has been received.
static struct Listener * listener_head
Listeners are held in a doubly linked list.
static void send_full_set(struct Operation *op)
Switch to full set transmission for op.
#define PROBABILITY_FOR_NEW_ROUND
Is the estimated probability for a new round this values is based on the bsc thesis of Elias Summerma...
static int check_byzantine_bounds(struct Operation *op)
Check if all given byzantine parameters are in given boundaries.
static void handle_client_accept(void *cls, const struct GNUNET_SETU_AcceptMessage *msg)
Handle a request from the client to accept a set operation that came from a remote peer.
static void handle_union_p2p_over(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a over message from a remote peer.
static void handle_client_cancel(void *cls, const struct GNUNET_SETU_CancelMessage *msg)
Handle a request from the client to cancel a running set operation.
static void handle_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
static void handle_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
static void handle_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
static int check_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Check an IBF message from a remote peer.
static struct IBF_Key get_ibf_key(const struct GNUNET_HashCode *src)
Derive the IBF key from a hash code and a salt.
static struct GNUNET_CADET_Handle * cadet
Handle to the cadet service, used to listen for and connect to remote peers.
static void maybe_finish(struct Operation *op)
Tests if the operation is finished, and if so notify.
static void handle_client_reject(void *cls, const struct GNUNET_SETU_RejectMessage *msg)
Called when the listening client rejects an operation request by another peer.
static int init_key_to_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for initializing the key-to-element mapping of a union operation.
static int send_offers_iterator(void *cls, uint32_t key, void *value)
Iterator to send elements to a remote peer.
static void handle_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
static int check_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
static int check_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
static int send_missing_full_elements_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries.
static unsigned int get_size_from_difference(unsigned int diff, int number_buckets_per_element, float ibf_bucket_number_factor)
Compute the necessary order of an ibf from the size of the symmetric set difference.
static void handle_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Handle a request for a set operation from another peer.
static int in_shutdown
Are we in shutdown? if GNUNET_YES and the number of clients drops to zero, disconnect from CADET.
static int destroy_elements_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator over hash map entries to free element entries.
static void handle_union_p2p_send_full(void *cls, const struct TransmitFullMessage *msg)
Handle send full message received from other peer.
static void handle_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
static struct GNUNET_STATISTICS_Handle * _GSS_statistics
Statistics handle.
static unsigned int num_clients
Number of active clients.
static void channel_window_cb(void *cls, const struct GNUNET_CADET_Channel *channel, int window_size)
Function called whenever an MQ-channel's transmission window size changes.
static void advance_generation(struct Set *set)
Advance the current generation of a set, adding exclusion ranges if necessary.
static int update_message_control_flow(struct GNUNET_CONTAINER_MultiHashMap *hash_map, enum MESSAGE_CONTROL_FLOW_STATE new_mcfs, const struct GNUNET_HashCode *hash_code, enum MESSAGE_TYPE mt)
Function to update, track and validate message received in differential sync.
static void check_max_differential_rounds(struct Operation *op)
Limit active passive switches in differential sync to configured security level.
static void handle_union_p2p_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a done message from a remote peer.
static int check_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Check a request for a set operation from another peer.
static void fail_union_operation(struct Operation *op)
Inform the client that the union operation has failed, and proceed to destroy the evaluate operation.
static int check_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Check offer (of struct GNUNET_HashCodes).
static void send_client_done(void *cls)
Signal to the client that the operation has finished and destroy the operation.
static void unsalt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
Reverse modification done in the salt_key function.
static void handle_union_p2p_full_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a "full done" message.
static void salt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
Modify an IBF key k_in based on the salt, returning a salted key in k_out.
static int prepare_ibf(struct Operation *op, uint32_t size)
Create an ibf with the operation's elements of the specified size.
static void op_register_element(struct Operation *op, struct ElementEntry *ee, int received)
Insert an element into the union operation's key-to-element mapping.
static void incoming_destroy(struct Operation *op)
Destroy an incoming request from a remote peer.
static struct Listener * listener_tail
Listeners are held in a doubly linked list.
static int determinate_avg_element_size_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for determining average size.
static int check_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check a full element message from a remote peer.
MESSAGE_TYPE
Message types to track in message control flow.
@ DEMAND_MESSAGE
Demand message type.
@ OFFER_MESSAGE
Offer message type.
@ ELEMENT_MESSAGE
Element message type.
static enum GNUNET_GenericReturnValue check_valid_phase(const uint8_t allowed_phases[], size_t size_phases, struct Operation *op)
Validates the if a message is received in a correct phase.
static void handle_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Handle offers (of struct GNUNET_HashCodes) and respond with demands (of struct GNUNET_HashCodes).
static void initialize_key_to_element(struct Operation *op)
Initialize the IBF key to element mapping local to this set operation.
static int check_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Check a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
static enum GNUNET_GenericReturnValue free_values_iter(void *cls, const struct GNUNET_HashCode *key, void *value)
static void send_offers_for_key(struct Operation *op, struct IBF_Key ibf_key)
Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
static int destroy_key_to_element_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries.
static void handle_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Handle an IBF message from a remote peer.
static void * channel_new_cb(void *cls, struct GNUNET_CADET_Channel *channel, const struct GNUNET_PeerIdentity *source)
Method called whenever another peer has added us to a channel the other peer initiated.
static int check_union_p2p_send_full(void *cls, const struct TransmitFullMessage *msg)
Check send full message received from other peer.
static void handle_client_listen(void *cls, const struct GNUNET_SETU_ListenMessage *msg)
Called when a client wants to create a new listener.
#define SE_STRATA_COUNT
Number of IBFs in a strata estimator.
static struct Operation * get_incoming(uint32_t id)
Get the incoming socket associated with the given id.
static void full_sync_plausibility_check(struct Operation *op)
Function that checks if full sync is plausible.
static void * client_connect_cb(void *cls, struct GNUNET_SERVICE_Client *c, struct GNUNET_MQ_Handle *mq)
Callback called when a client connects to the service.
static void handle_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
#define INCOMING_CHANNEL_TIMEOUT
How long do we hold on to an incoming channel if there is no local listener before giving up?
static void handle_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
#define LOG(kind,...)
static int is_message_in_message_control_flow(struct GNUNET_CONTAINER_MultiHashMap *hash_map, struct GNUNET_HashCode *hash_code, enum MESSAGE_TYPE mt)
Validate if a message in differential sync si already received before.
#define SECURITY_LEVEL
Security level used for byzantine checks (2^80)
static int send_ibf(struct Operation *op, uint32_t ibf_size)
Send an ibf of appropriate size.
MODE_OF_OPERATION
Different modes of operations.
@ DIFFERENTIAL_SYNC
Mode just synchronizes the difference between sets.
@ FULL_SYNC_LOCAL_SENDING_FIRST
Mode send full set sending local set first.
@ FULL_SYNC_REMOTE_SENDING_FIRST
Mode request full set from remote peer.
static struct KeyEntry * op_get_element(struct Operation *op, const struct GNUNET_HashCode *element_hash)
Determine whether the given element is already in the operation's element set.
static void client_disconnect_cb(void *cls, struct GNUNET_SERVICE_Client *client, void *internal_cls)
Clean up after a client has disconnected.
UnionOperationPhase
Current phase we are in for a union operation.
@ PHASE_FINISH_CLOSING
The protocol is almost finished, but we still have to flush our message queue and/or expect some elem...
@ PHASE_EXPECT_SE
We sent the request message, and expect a strata estimator.
@ PHASE_EXPECT_IBF_LAST
Continuation for multi part IBFs.
@ PHASE_FULL_RECEIVING
Phase that receives full set first and then sends elements that are the local peer missing.
@ PHASE_FINISH_WAITING
In the penultimate phase, we wait until all our demands are satisfied.
@ PHASE_FINISHED
In the ultimate phase, we wait until our demands are satisfied and then quit (sending another DONE me...
@ PHASE_PASSIVE_DECODING
The other peer is decoding the IBF we just sent.
@ PHASE_ACTIVE_DECODING
We are decoding an IBF.
@ PHASE_FULL_SENDING
After sending the full set, wait for responses with the elements that the local peer is missing.
@ PHASE_EXPECT_IBF
We sent the strata estimator, and expect an IBF.
static uint8_t estimate_best_mode_of_operation(uint64_t avg_element_size, uint64_t local_set_size, uint64_t remote_set_size, uint64_t est_set_diff_remote, uint64_t est_set_diff_local, uint64_t bandwith_latency_tradeoff, uint64_t ibf_bucket_number_factor)
Function that chooses the optimal mode of operation depending on operation parameters.
static void channel_end_cb(void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
Function called whenever a channel is destroyed.
static void send_client_element(struct Operation *op, const struct GNUNET_SETU_Element *element, enum GNUNET_SETU_Status status)
Send a result message to the client indicating that there is a new element.
static int determinate_done_message_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for determining if all demands have been satisfied.
#define SE_IBF_HASH_NUM
The hash num parameter for the difference digests and strata estimators.
static void incoming_timeout_cb(void *cls)
Timeout happens iff:
#define SE_IBFS_TOTAL_SIZE
Primes for all 4 different strata estimators 61,67,71,73,79,83,89,97 348 Based on the bsc thesis of E...
#define DIFFERENTIAL_RTT_MEAN
AVG RTT for differential sync when k=2 and Factor = 2 Based on the bsc thesis of Elias Summermatter (...
static int op_get_element_iterator(void *cls, uint32_t key, void *value)
Iterator over the mapping from IBF keys to element entries.
static int prepare_ibf_iterator(void *cls, uint32_t key, void *value)
Insert a key into an ibf.
static int check_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check an element message from a remote peer.
static void run(void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_SERVICE_Handle *service)
Function called by the service's run method to run service-specific setup code.
static int check_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
uint8_t determine_strata_count(uint64_t avg_element_size, uint64_t element_count)
Calculates the optimal number of strata Estimators to send.
static unsigned int ibf_size
CADET service; establish channels to distant peers.
Constants for network protocols.
Two-peer set union operations.
API to create, modify and access statistics.
struct GNUNET_CADET_Handle * GNUNET_CADET_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the MQ-based cadet service.
Definition cadet_api.c:897
void GNUNET_CADET_receive_done(struct GNUNET_CADET_Channel *channel)
Indicate readiness to receive the next message on a channel.
Definition cadet_api.c:875
void GNUNET_CADET_channel_destroy(struct GNUNET_CADET_Channel *channel)
Destroy an existing channel.
Definition cadet_api.c:833
struct GNUNET_MQ_Handle * GNUNET_CADET_get_mq(const struct GNUNET_CADET_Channel *channel)
Obtain the message queue for a connected channel.
Definition cadet_api.c:1081
struct GNUNET_CADET_Port * GNUNET_CADET_open_port(struct GNUNET_CADET_Handle *h, const struct GNUNET_HashCode *port, GNUNET_CADET_ConnectEventHandler connects, void *connects_cls, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Open a port to receive incoming MQ-based channels.
Definition cadet_api.c:966
void GNUNET_CADET_disconnect(struct GNUNET_CADET_Handle *handle)
Disconnect from the cadet service.
Definition cadet_api.c:777
void GNUNET_CADET_close_port(struct GNUNET_CADET_Port *p)
Close a port opened with GNUNET_CADET_open_port.
Definition cadet_api.c:804
struct GNUNET_CADET_Channel * GNUNET_CADET_channel_create(struct GNUNET_CADET_Handle *h, void *channel_cls, const struct GNUNET_PeerIdentity *destination, const struct GNUNET_HashCode *port, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Create a new channel towards a remote peer.
Definition cadet_api.c:1030
struct GNUNET_CONFIGURATION_Handle * GNUNET_CONFIGURATION_create(const struct GNUNET_OS_ProjectData *pd)
Create a new configuration object.
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_get_value_number(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, unsigned long long *number)
Get a configuration value that should be a number.
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_load(struct GNUNET_CONFIGURATION_Handle *cfg, const char *filename)
Load configuration.
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_get_value_float(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, float *number)
Get a configuration value that should be a floating point number.
uint64_t GNUNET_CRYPTO_random_u64(uint64_t max)
Generate a random unsigned 64-bit value.
uint32_t GNUNET_CRYPTO_random_u32(uint32_t i)
Produce a random value.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
#define GNUNET_CRYPTO_hkdf_gnunet(result, out_len, xts, xts_len, skm, skm_len,...)
A peculiar HKDF instantiation that tried to mimic Truncated NMAC.
int GNUNET_CRYPTO_hash_cmp(const struct GNUNET_HashCode *h1, const struct GNUNET_HashCode *h2)
Compare function for HashCodes, producing a total ordering of all hashcodes.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_contains(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Check if the map contains any value under the given key (including values that are NULL).
int GNUNET_CONTAINER_multihashmap32_get_multiple(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, GNUNET_CONTAINER_MultiHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map that match a particular key.
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MultiHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap32_put(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
struct GNUNET_CONTAINER_MultiHashMap32 * GNUNET_CONTAINER_multihashmap32_create(unsigned int len)
Create a 32-bit key multi hash map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_remove(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, const void *value)
Remove the given key-value pair from the map.
unsigned int GNUNET_CONTAINER_multihashmap32_size(const struct GNUNET_CONTAINER_MultiHashMap32 *map)
Get the number of key-value pairs in the map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
unsigned int GNUNET_CONTAINER_multihashmap_size(const struct GNUNET_CONTAINER_MultiHashMap *map)
Get the number of key-value pairs in the map.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
void GNUNET_CONTAINER_multihashmap32_destroy(struct GNUNET_CONTAINER_MultiHashMap32 *map)
Destroy a 32-bit key hash map.
int GNUNET_CONTAINER_multihashmap32_iterate(struct GNUNET_CONTAINER_MultiHashMap32 *map, GNUNET_CONTAINER_MultiHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST
, ' bother checking if a value already exists (faster than GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE...
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE
Allow multiple values with the same key.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY
There must only be one value per key; storing a value should fail if a value under the same key alrea...
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
If a value with the given key exists, replace it.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
#define GNUNET_log(kind,...)
void GNUNET_CRYPTO_hash_context_read(struct GNUNET_HashContext *hc, const void *buf, size_t size)
Add data to be hashed.
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
void GNUNET_CRYPTO_hash_context_finish(struct GNUNET_HashContext *hc, struct GNUNET_HashCode *r_hash)
Finish the hash computation.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
GNUNET_GenericReturnValue
Named constants for return values.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
struct GNUNET_HashContext * GNUNET_CRYPTO_hash_context_start(void)
Start incremental hashing operation.
@ GNUNET_OK
@ GNUNET_YES
@ GNUNET_NO
@ GNUNET_SYSERR
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur.
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_DEBUG
@ GNUNET_ERROR_TYPE_INFO
struct GNUNET_MessageHeader * GNUNET_copy_message(const struct GNUNET_MessageHeader *msg)
Create a copy of the given message.
int GNUNET_snprintf(char *buf, size_t size, const char *format,...) __attribute__((format(printf
Like snprintf, just aborts if the buffer is of insufficient size.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_free(ptr)
Wrapper around free.
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition mq.c:305
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *mqm)
Discard the message queue message, free all allocated resources.
Definition mq.c:285
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct.
#define GNUNET_MQ_msg_nested_mh(mvar, type, mh)
Allocate a GNUNET_MQ_Envelope, and append a payload message after the given message struct.
#define GNUNET_MQ_msg_header(type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
#define GNUNET_MQ_msg_header_extra(mh, esize, type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header and extra space.
#define GNUNET_MQ_extract_nested_mh(var)
Return a pointer to the message at the end of the given message.
const struct GNUNET_OS_ProjectData * GNUNET_OS_project_data_gnunet(void)
Return default project data used by 'libgnunetutil' for GNUnet.
#define GNUNET_MESSAGE_TYPE_SETU_REQUEST
Notify the client of an incoming request from a remote peer.
#define GNUNET_MESSAGE_TYPE_SETU_REJECT
Reject a set request.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST
Request a set union operation from a remote peer.
#define GNUNET_MESSAGE_TYPE_SETU_RESULT
Handle result message from operation.
#define GNUNET_MESSAGE_TYPE_SETU_CREATE
Create a new local set.
#define GNUNET_MESSAGE_TYPE_SETU_ADD
Add element to set.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_DONE
Set operation is done.
#define GNUNET_MESSAGE_TYPE_SETU_ACCEPT
Accept an incoming set request.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER
Request all missing elements from the other peer, based on their sets and the elements we previously ...
#define GNUNET_MESSAGE_TYPE_SETU_CANCEL
Cancel a set operation.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER
Tell the other peer which hashes match a given IBF key.
#define GNUNET_MESSAGE_TYPE_SETU_LISTEN
Listen for operation requests.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE
Request all missing elements from the other peer, based on their sets and the elements we previously ...
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEC
Compressed strata estimator.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_IBF
Invertible bloom filter.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS
Actual set elements.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL
Signals other peer that all elements are sent.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT
Send a set element, not as response to a demand but because we're sending the full set.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SE
Strata estimator.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY
Tell the other peer to send us a list of hashes that match an IBF key.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SETU_EVALUATE
Evaluate a set operation.
void GNUNET_SCHEDULER_shutdown(void)
Request the shutdown of a scheduler.
Definition scheduler.c:572
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run on shutdown, that is when a CTRL-C signal is received,...
Definition scheduler.c:1345
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition scheduler.c:986
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition scheduler.c:1283
#define GNUNET_SERVICE_MAIN(pd, service_name, service_options, init_cb, connect_cb, disconnect_cb, cls,...)
Creates the "main" function for a GNUnet service.
void GNUNET_SERVICE_client_drop(struct GNUNET_SERVICE_Client *c)
Ask the server to disconnect from the given client.
Definition service.c:2463
void GNUNET_SERVICE_client_continue(struct GNUNET_SERVICE_Client *c)
Continue receiving further messages from the given client.
Definition service.c:2434
@ GNUNET_SERVICE_OPTION_NONE
Use defaults.
#define GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE
Maximum size of a context message for set operation requests.
void GNUNET_SETU_element_hash(const struct GNUNET_SETU_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition setu_api.c:890
GNUNET_SETU_Status
Status for the result callback.
@ GNUNET_SETU_STATUS_DONE
Success, all elements have been sent (and received).
@ GNUNET_SETU_STATUS_ADD_REMOTE
Element should be added to the result set of the remote peer, i.e.
@ GNUNET_SETU_STATUS_FAILURE
The other peer refused to do the operation with us, or something went wrong.
@ GNUNET_SETU_STATUS_ADD_LOCAL
Element should be added to the result set of the local peer, i.e.
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
static unsigned int size
Size of the "table".
Definition peer.c:68
#define _(String)
GNU gettext support macro.
Definition platform.h:179
uint32_t number
static struct GNUNET_MQ_Handle * mq
Our connection to the resolver service, created on-demand, but then persists until error or shutdown.
uint8_t ibf_get_max_counter(struct InvertibleBloomFilter *ibf)
Returns the minimal bytes needed to store the counter of the IBF.
Definition ibf.c:287
State we keep per client.
struct GNUNET_MQ_Handle * mq
MQ to talk to client.
struct Listener * listener
Listener, if associated with the client, otherwise NULL.
struct Set * set
Set, if associated with the client, otherwise NULL.
struct GNUNET_SERVICE_Client * client
Client this is about.
Information about an element element in the set.
int remote
GNUNET_YES if the element is a remote element, and does not belong to the operation's set.
struct GNUNET_SET_Element element
The actual element.
unsigned int generation
First generation that includes this element.
struct GNUNET_HashCode element_hash
Hash of the element.
struct GNUNET_ARM_Operation * next
This is a doubly-linked list.
Definition arm_api.c:45
Opaque handle to a channel.
Definition cadet.h:116
Opaque handle to the service.
Definition cadet_api.c:39
Opaque handle to a port.
Definition cadet_api.c:80
Internal representation of the hash map.
Internal representation of the hash map.
A 512-bit hashcode.
Handle to a message queue.
Definition mq.c:87
Message handler for a specific message type.
Header for all communications.
The identity of the host (wraps the signing key of the peer).
Entry in list of pending tasks.
Definition scheduler.c:141
Handle to a client that is connected to a service.
Definition service.c:249
Handle to a service.
Definition service.c:116
Message sent by a listening client to the service to accept performing the operation with the other p...
Definition setu.h:79
Sent to the service by the client in order to cancel a set operation.
Definition setu.h:350
Message sent by the client to the service to ask starting a new set to perform operations with.
Definition setu.h:41
Message sent by client to the service to add an element to the set.
Definition setu.h:326
uint16_t element_type
Type of the element to add or remove.
Definition setu.h:335
uint16_t reserved
For alignment, always zero.
Definition setu.h:340
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SETU_ADD.
Definition setu.h:330
Element stored in a set.
uint16_t element_type
Application-specific element type.
uint16_t size
Number of bytes in the buffer pointed to by data.
const void * data
Actual data of the element.
Message sent by client to service to initiate a set operation as a client (not as listener).
Definition setu.h:203
Message sent by the client to the service to start listening for incoming requests to perform a certa...
Definition setu.h:56
Message sent by a listening client to the service to reject performing the operation with the other p...
Definition setu.h:158
A request for an operation with another client.
Definition setu.h:175
struct GNUNET_PeerIdentity peer_id
Identity of the requesting peer.
Definition setu.h:190
uint32_t accept_id
ID of the to identify the request when accepting or rejecting it.
Definition setu.h:185
Message sent by the service to the client to indicate an element that is removed (set intersection) o...
Definition setu.h:290
uint64_t current_size
Current set size.
Definition setu.h:299
uint16_t result_status
Was the evaluation successful? Contains an enum GNUNET_SETU_Status in NBO.
Definition setu.h:310
uint16_t element_type
Type of the element attached to the message, if any.
Definition setu.h:315
uint32_t request_id
id the result belongs to
Definition setu.h:304
uint16_t size
Number of bytes in the buffer pointed to by data.
const void * data
Actual data of the element.
uint16_t element_type
Application-specific element type.
Handle for the service.
Context for op_get_element_iterator.
struct GNUNET_HashCode hash
FIXME.
struct KeyEntry * k
FIXME.
Message containing buckets of an invertible bloom filter.
Hash of an IBF key.
Definition ibf.h:55
Keys that can be inserted into and removed from an IBF.
Definition ibf.h:46
uint64_t key_val
Definition ibf.h:47
estimate_best_mode_of_operation (uint64_t avg_element_size, uint64_t local_set_size,...
Invertible bloom filter (IBF).
Definition ibf.h:83
int remote_decoded_count
If an IBF is decoded this count stores how many elements are on the remote site.
Definition ibf.h:108
int local_decoded_count
If an IBF is decoded this count stores how many elements are on the local site.
Definition ibf.h:101
uint32_t size
How many cells does this IBF have?
Definition ibf.h:87
The key entry is used to associate an ibf key with an element.
struct ElementEntry * element
The actual element associated with the key.
struct IBF_Key ibf_key
IBF key for the entry, derived from the current salt.
int received
Did we receive this element? Even if element->is_foreign is false, we might have received the element...
A listener is inhabited by a client, and waits for evaluation requests from remote peers.
struct Listener * next
Listeners are held in a doubly linked list.
struct ClientState * cs
Client that owns the listener.
struct GNUNET_HashCode app_id
Application ID for the operation, used to distinguish multiple operations of the same type with the s...
struct GNUNET_CADET_Port * open_port
The port we are listening on with CADET.
struct Listener * prev
Listeners are held in a doubly linked list.
struct Operation * op_tail
Tail of DLL of operations this listener is responsible for.
struct Operation * op_head
Head of DLL of operations this listener is responsible for.
struct StrataEstimator ** stratas
Array of strata estimators.
Operation context used to execute a set operation.
unsigned int generation_created
Generation in which the operation handle was created.
uint32_t suggest_id
Unique request id for the request from a remote peer, sent to the client, which will accept or reject...
uint32_t salt_receive
Salt for the IBF we've received and that we're currently decoding.
struct GNUNET_CONTAINER_MultiHashMap * message_control_flow
Hashmap to keep track of the send/received messages.
struct InvertibleBloomFilter * remote_ibf
The IBF we currently receive.
struct GNUNET_MessageHeader * context_msg
Context message, may be NULL.
int force_delta
Always use delta operation instead of sending full sets, even it it's less efficient.
int force_full
Always send full sets, even if delta operations would be more efficient.
uint8_t ibf_bucket_number_factor
Set difference is multiplied with this factor to gennerate large enough IBF.
uint64_t total_elements_size_local
Total size of local set.
uint64_t local_set_diff
Estimated or committed set difference at the start.
struct InvertibleBloomFilter * local_ibf
The IBF with the local set's element.
uint64_t ibf_buckets_received
Number of ibf buckets already received into the remote_ibf.
struct Operation * prev
Kept in a DLL of the listener, if listener is non-NULL.
struct GNUNET_CADET_Channel * channel
Channel to the peer.
uint8_t mode_of_operation
Mode of operation that was chosen by the algorithm.
uint32_t salt_send
Salt that we're using for sending IBFs.
uint8_t differential_sync_iterations
is the count of already passed differential sync iterations
uint64_t rtt_bandwidth_tradeoff
User defined Bandwidth Round Trips Tradeoff.
struct GNUNET_CONTAINER_MultiHashMap * demanded_hashes
Hashes for elements that we have demanded from the other peer.
uint8_t ibf_number_buckets_per_element
Number of Element per bucket in IBF.
struct GNUNET_MQ_Handle * mq
Message queue for the channel.
bool active_passive_switch_required
Boolean to enforce an active passive switch.
struct GNUNET_PeerIdentity peer
The identity of the requesting peer.
int client_done_sent
Did we send the client that we are done?
uint8_t peer_site
Defines which site a client is 0 = Initiating peer 1 = Receiving peer.
struct MultiStrataEstimator * se
Copy of the set's strata estimator at the time of creation of this operation.
enum UnionOperationPhase phase
Current state of the operation.
uint64_t local_element_count
Local peer element count.
int symmetric
GNUNET_YES to also send back set elements we are sending to the remote peer.
uint64_t byzantine_lower_bound
Lower bound for the set size, used only when byzantine mode is enabled.
struct Listener * listener
Port this operation runs on.
uint32_t received_fresh
Number of elements we received from the other peer that were not in the local set yet.
uint64_t initial_size
Initial size of our set, just before the operation started.
struct Operation * next
Kept in a DLL of the listener, if listener is non-NULL.
uint32_t salt
Salt to use for the operation.
uint64_t byzantine_upper_bound
Limit of number of elements in set.
uint32_t remote_element_count
Remote peers element count.
uint64_t remote_set_diff
Estimated or committed set difference at the start.
uint32_t client_request_id
ID used to identify an operation between service and client.
struct GNUNET_CONTAINER_MultiHashMap32 * key_to_element
Maps unsalted IBF-Keys to elements.
int byzantine
GNUNET_YES to fail operations where Byzantine faults are suspected
struct GNUNET_CONTAINER_MultiHashMap * inquiries_sent
Hashmap to keep track of the send/received inquiries (ibf keys)
struct GNUNET_SCHEDULER_Task * timeout_task
Timeout task, if the incoming peer has not been accepted after the timeout, it will be disconnected.
struct Set * set
Set associated with the operation, NULL until the spec has been associated with a set.
uint32_t received_total
Total number of elements received from the other peer.
Used as a closure for sending elements with a specific IBF key.
struct Operation * op
Operation for which the elements should be sent.
struct IBF_Key ibf_key
The IBF key whose matching elements should be sent.
SetContent stores the actual set elements, which may be shared by multiple generations derived from o...
int iterator_count
Number of concurrently active iterators.
unsigned int latest_generation
FIXME: document!
uint64_t elements_randomized_salt
Salt to construct the randomized element map.
struct GNUNET_CONTAINER_MultiHashMap * elements
Maps struct GNUNET_HashCode * to struct ElementEntry *.
struct GNUNET_CONTAINER_MultiHashMap * elements_randomized
Maps struct GNUNET_HashCode * to struct ElementEntry * randomized.
unsigned int refcount
Number of references to the content.
A set that supports a specific operation with other peers.
struct MultiStrataEstimator * se
The strata estimator is only generated once for each set.
struct Set * next
Sets are held in a doubly linked list (in sets_head and sets_tail).
struct Operation * ops_head
Evaluate operations are held in a linked list.
struct Operation * ops_tail
Evaluate operations are held in a linked list.
struct Set * prev
Sets are held in a doubly linked list.
struct SetContent * content
Content, possibly shared by multiple sets, and thus reference counted.
struct ClientState * cs
Client that owns the set.
unsigned int current_generation
Current generation, that is, number of previously executed operations and lazy copies on the underlyi...
Strata estimator together with the peer's overall set size.
uint64_t set_size
Size of the local set.
uint8_t se_count
The number of ses transmitted.
struct InvertibleBloomFilter ** strata
The IBFs of this strata estimator.
unsigned int strata_count
Size of the IBF array in strata.
Message which signals to other peer that we are sending full set.
uint32_t local_set_difference
Local set difference calculated with strata estimator.
uint32_t remote_set_difference
Remote set difference calculated with strata estimator.
uint32_t remote_set_size
Total remote set size.
Struct to tracked messages in message control flow.
enum MESSAGE_CONTROL_FLOW_STATE element
Track the message control state of the element message.
enum MESSAGE_CONTROL_FLOW_STATE offer
Track the message control state of the offer message.
enum MESSAGE_CONTROL_FLOW_STATE demand
Track the message control state of the demand message.