GNUnet 0.22.2
gnunet-service-setu.c
Go to the documentation of this file.
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
27#include "platform.h"
28#include "gnunet_util_lib.h"
30#include "ibf.h"
31#include "gnunet_protocols.h"
36#include <gcrypt.h>
37#include "gnunet_setu_service.h"
38#include "setu.h"
39
40#define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
41
46#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
47
51#define SE_STRATA_COUNT 32
52
53
58#define SE_IBFS_TOTAL_SIZE 632
59
63#define SE_IBF_HASH_NUM 3
64
68#define MAX_BUCKETS_PER_MESSAGE ((1 << 16) / IBF_BUCKET_SIZE)
69
75#define MAX_IBF_SIZE 1048576
76
77
82#define IBF_MIN_SIZE 37
83
88#define DIFFERENTIAL_RTT_MEAN 3.65145
89
94#define SECURITY_LEVEL 80
95
101#define PROBABILITY_FOR_NEW_ROUND 0.15
102
107#define MEASURE_PERFORMANCE 0
108
109
114{
119
130
135
140
145
151
157
163
169
176
182{
187
192
198
199
206struct ElementEntry
207{
213
219
223 unsigned int generation;
224
229 int remote;
230};
231
232
237struct Listener;
238
239
243struct Set;
244
245
249struct ClientState
250{
254 struct Set *set;
255
259 struct Listener *listener;
260
265
269 struct GNUNET_MQ_Handle *mq;
270};
271
272
276struct Operation
277{
278
284
288 uint64_t initial_size;
289
293 struct Operation *next;
294
298 struct Operation *prev;
299
304
308 struct Listener *listener;
309
313 struct GNUNET_MQ_Handle *mq;
314
319
324 struct Set *set;
325
331
336
341
348
354
359
364
369
374
378 uint32_t salt_send;
379
383 uint32_t salt_receive;
384
390
395
399 uint32_t salt;
400
404 uint32_t remote_element_count;
405
409 uint32_t client_request_id;
410
415 int force_delta;
416
421 int force_full;
422
427 int byzantine;
428
434
440
446 uint32_t suggest_id;
447
452 unsigned int generation_created;
453
454
459
460
465
466
472
478 uint8_t peer_site;
479
480
485
490
495
500
501
506
511
516
521
526
531};
532
533
538struct SetContent
539{
544
549
554
558 unsigned int refcount;
559
563 unsigned int latest_generation;
564
568 int iterator_count;
569};
570
571
575struct Set
576{
580 struct Set *next;
581
585 struct Set *prev;
586
591 struct ClientState *cs;
592
597 struct SetContent *content;
598
604
608 struct Operation *ops_head;
609
613 struct Operation *ops_tail;
614
619 unsigned int current_generation;
620
621};
622
623
627struct KeyEntry
628{
632 struct IBF_Key ibf_key;
633
640 struct ElementEntry *element;
641
647 int received;
648};
649
650
656{
661 struct IBF_Key ibf_key;
662
667 struct Operation *op;
668};
669
670
675struct Listener
676{
680 struct Listener *next;
681
685 struct Listener *prev;
686
692 struct Operation *op_head;
693
699 struct Operation *op_tail;
700
705 struct ClientState *cs;
706
711
716 struct GNUNET_HashCode app_id;
717
718};
719
720
726
731
735static struct Listener *listener_head;
736
740static struct Listener *listener_tail;
741
745static unsigned int num_clients;
746
751static int in_shutdown;
752
758static uint32_t suggest_id;
759
760#if MEASURE_PERFORMANCE
765static const struct GNUNET_CONFIGURATION_Handle *setu_cfg;
766
767
773struct perf_num_send_received_msg
774{
775 uint64_t sent;
776 uint64_t sent_var_bytes;
777 uint64_t received;
778 uint64_t received_var_bytes;
779};
780
784struct per_store_struct
785{
786 struct perf_num_send_received_msg operation_request;
787 struct perf_num_send_received_msg se;
788 struct perf_num_send_received_msg request_full;
789 struct perf_num_send_received_msg element_full;
790 struct perf_num_send_received_msg full_done;
791 struct perf_num_send_received_msg ibf;
792 struct perf_num_send_received_msg inquery;
793 struct perf_num_send_received_msg element;
794 struct perf_num_send_received_msg demand;
795 struct perf_num_send_received_msg offer;
796 struct perf_num_send_received_msg done;
797 struct perf_num_send_received_msg over;
798 uint64_t se_diff;
799 uint64_t se_diff_remote;
800 uint64_t se_diff_local;
801 uint64_t active_passive_switches;
802 uint8_t mode_of_operation;
803};
804
805struct per_store_struct perf_store;
806#endif
807
813{
818
823
828
833};
834
840{
845
850
855};
856
857
862{
875};
876
877
878#if MEASURE_PERFORMANCE
879
885static void
886load_config (struct Operation *op)
887{
888 long long number;
889 float fl;
890
891 setu_cfg = GNUNET_CONFIGURATION_create ();
893 "perf_setu.conf");
895 "IBF",
896 "BUCKET_NUMBER_FACTOR",
897 &fl);
898 op->ibf_bucket_number_factor = fl;
900 "IBF",
901 "NUMBER_PER_BUCKET",
902 &number);
903 op->ibf_number_buckets_per_element = number;
905 "PERFORMANCE",
906 "TRADEOFF",
907 &number);
908 op->rtt_bandwidth_tradeoff = number;
910 "BOUNDARIES",
911 "UPPER_ELEMENT",
912 &number);
913 op->byzantine_upper_bound = number;
914 op->peer_site = 0;
915}
916
917
924static int
925sum_sent_received_bytes (uint64_t size,
926 struct perf_num_send_received_msg
927 perf_num_send_received_msg)
928{
929 return (size * perf_num_send_received_msg.sent)
930 + (size * perf_num_send_received_msg.received)
931 + perf_num_send_received_msg.sent_var_bytes
932 + perf_num_send_received_msg.received_var_bytes;
933}
934
935
939static void
940calculate_perf_store ()
941{
942
946 float rtt = 1;
947 int bytes_transmitted = 0;
948
952 if ((perf_store.element_full.received != 0) ||
953 (perf_store.element_full.sent != 0)
954 )
955 rtt += 1;
956
957 if ((perf_store.request_full.received != 0) ||
958 (perf_store.request_full.sent != 0)
959 )
960 rtt += 0.5;
961
966 if ((perf_store.element.received != 0) ||
967 (perf_store.element.sent != 0))
968 {
969 int iterations = perf_store.active_passive_switches;
970
971 if (iterations > 0)
972 rtt += iterations * 0.5;
973 rtt += 2.5;
974 }
975
976
980 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
982 perf_store.request_full);
983
984 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
986 perf_store.element_full);
987 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
989 perf_store.element);
990 // bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_store.operation_request);
991 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
993 perf_store.se);
994 bytes_transmitted += sum_sent_received_bytes (4, perf_store.full_done);
995 bytes_transmitted += sum_sent_received_bytes (sizeof(struct IBFMessage),
996 perf_store.ibf);
997 bytes_transmitted += sum_sent_received_bytes (sizeof(struct InquiryMessage),
998 perf_store.inquery);
999 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1001 perf_store.demand);
1002 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1004 perf_store.offer);
1005 bytes_transmitted += sum_sent_received_bytes (4, perf_store.done);
1006
1010 float factor;
1011 GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR",
1012 &factor);
1013 long long num_per_bucket;
1014 GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET",
1015 &num_per_bucket);
1016
1017
1018 int decoded = 0;
1019 if (perf_store.active_passive_switches == 0)
1020 decoded = 1;
1021 int ibf_bytes_transmitted = sum_sent_received_bytes (sizeof(struct
1022 IBFMessage),
1023 perf_store.ibf);
1024
1025 FILE *out1 = fopen ("perf_data.csv", "a");
1026 fprintf (out1, "%d,%f,%d,%d,%f,%d,%d,%d,%d,%d\n",num_per_bucket,factor,
1027 decoded,ibf_bytes_transmitted,rtt,perf_store.se_diff,
1028 bytes_transmitted,
1029 perf_store.se_diff_local,perf_store.se_diff_remote,
1030 perf_store.mode_of_operation);
1031 fclose (out1);
1032
1033}
1034
1035
1036#endif
1049static uint8_t
1050estimate_best_mode_of_operation (uint64_t avg_element_size,
1051 uint64_t local_set_size,
1052 uint64_t remote_set_size,
1053 uint64_t est_set_diff_remote,
1054 uint64_t est_set_diff_local,
1055 uint64_t bandwith_latency_tradeoff,
1056 uint64_t ibf_bucket_number_factor)
1057{
1058
1059 /*
1060 * In case of initial sync fall to predefined states
1061 */
1062 {
1063 if (0 == local_set_size)
1065 if (0 == remote_set_size)
1067 }
1068 /*
1069 * Calculate bytes for full Sync
1070 */
1071 {
1072 uint8_t sizeof_full_done_header = 4;
1073 uint8_t sizeof_done_header = 4;
1074 uint8_t rtt_min_full = 2;
1075 uint8_t sizeof_request_full = 4;
1076 uint64_t estimated_total_diff = (est_set_diff_remote + est_set_diff_local);
1077
1078 /* Estimate byte required if we send first */
1079 uint64_t total_elements_to_send_local_send_first = est_set_diff_remote
1080 + local_set_size;
1081
1082 uint64_t total_bytes_full_local_send_first = (avg_element_size
1083 *
1084 total_elements_to_send_local_send_first) \
1085 + (
1086 total_elements_to_send_local_send_first * sizeof(struct
1088 ) \
1089 + (sizeof_full_done_header * 2) \
1090 + rtt_min_full
1091 * bandwith_latency_tradeoff;
1092
1093 /* Estimate bytes required if we request from remote peer */
1094 uint64_t total_elements_to_send_remote_send_first = est_set_diff_local
1095 + remote_set_size;
1096
1097 uint64_t total_bytes_full_remote_send_first = (avg_element_size
1098 *
1099 total_elements_to_send_remote_send_first) \
1100 + (
1101 total_elements_to_send_remote_send_first * sizeof(struct
1103 + (sizeof_full_done_header * 2
1104 ) \
1105 + (rtt_min_full + 0.5)
1106 * bandwith_latency_tradeoff \
1107 + sizeof_request_full;
1108
1109 /*
1110 * Calculate bytes for differential Sync
1111 */
1112
1113 /* Estimate bytes required by IBF transmission*/
1114
1115 long double ibf_bucket_count = estimated_total_diff
1116 * ibf_bucket_number_factor;
1117
1118 if (ibf_bucket_count <= IBF_MIN_SIZE)
1119 {
1120 ibf_bucket_count = IBF_MIN_SIZE;
1121 }
1122 {
1123 uint64_t ibf_message_count = ceil ( ((float) ibf_bucket_count)
1124 / ((float) MAX_BUCKETS_PER_MESSAGE));
1125
1126 uint64_t estimated_counter_size = ceil (
1127 MIN (2 * log2l (((float) local_set_size)
1128 / ((float) ibf_bucket_count)),
1129 log2l (local_set_size)));
1130
1131 long double counter_bytes = (float) estimated_counter_size / 8;
1132
1133 uint64_t ibf_bytes = ceil ((sizeof (struct IBFMessage) * ibf_message_count
1134 )
1135 * 1.2 \
1136 + (ibf_bucket_count * sizeof(struct IBF_Key))
1137 * 1.2 \
1138 + (ibf_bucket_count * sizeof(struct
1139 IBF_KeyHash))
1140 * 1.2 \
1141 + (ibf_bucket_count * counter_bytes) * 1.2);
1142
1143 /* Estimate full byte count for differential sync */
1144 uint64_t element_size = (avg_element_size
1145 + sizeof (struct GNUNET_SETU_ElementMessage)) \
1146 * estimated_total_diff;
1147 uint64_t done_size = sizeof_done_header;
1148 uint64_t inquery_size = (sizeof (struct IBF_Key)
1149 + sizeof (struct InquiryMessage))
1150 * estimated_total_diff;
1151 uint64_t demand_size =
1152 (sizeof(struct GNUNET_HashCode) + sizeof(struct GNUNET_MessageHeader))
1153 * estimated_total_diff;
1154 uint64_t offer_size = (sizeof (struct GNUNET_HashCode)
1155 + sizeof (struct GNUNET_MessageHeader))
1156 * estimated_total_diff;
1157
1158 uint64_t total_bytes_diff = (element_size + done_size + inquery_size
1159 + demand_size + offer_size + ibf_bytes) \
1161 * bandwith_latency_tradeoff);
1162
1163 uint64_t full_min = MIN (total_bytes_full_local_send_first,
1164 total_bytes_full_remote_send_first);
1165
1166 /* Decide between full and differential sync */
1167
1168 if (full_min < total_bytes_diff)
1169 {
1170 /* Decide between sending all element first or receiving all elements */
1171 if (total_bytes_full_remote_send_first >
1172 total_bytes_full_local_send_first
1173 )
1174 {
1176 }
1177 else
1178 {
1180 }
1181 }
1182 else
1183 {
1184 return DIFFERENTIAL_SYNC;
1185 }
1186 }
1187 }
1188}
1189
1190
1199static enum GNUNET_GenericReturnValue
1200check_valid_phase (const uint8_t allowed_phases[],
1201 size_t size_phases,
1202 struct Operation *op)
1203{
1207 for (uint32_t phase_ctr = 0; phase_ctr < size_phases; phase_ctr++)
1208 {
1209 uint8_t phase = allowed_phases[phase_ctr];
1210 if (phase == op->phase)
1211 {
1213 "Message received in valid phase\n");
1214 return GNUNET_YES;
1215 }
1216 }
1218 "Received message in invalid phase: %u\n", op->phase);
1219 return GNUNET_NO;
1220}
1221
1222
1234static int
1236 enum MESSAGE_CONTROL_FLOW_STATE new_mcfs,
1237 const struct GNUNET_HashCode *hash_code,
1238 enum MESSAGE_TYPE mt)
1239{
1240 struct messageControlFlowElement *cfe = NULL;
1241 enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1242
1247 cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1248 if ((ELEMENT_MESSAGE == mt) && (cfe != NULL))
1249 {
1250 if ((new_mcfs != MSG_CFS_SENT) && (MSG_CFS_RECEIVED != cfe->offer))
1251 {
1253 "Received an element without sent offer!\n");
1254 return GNUNET_NO;
1255 }
1256 /* Check that only requested elements are received! */
1257 if ((ELEMENT_MESSAGE == mt) && (new_mcfs != MSG_CFS_SENT) && (cfe->demand !=
1258 MSG_CFS_SENT))
1259 {
1261 "Received an element that was not demanded\n");
1262 return GNUNET_NO;
1263 }
1264 }
1265
1270 if (NULL == cfe)
1271 {
1273 if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (hash_map, hash_code,
1274 cfe,
1276 {
1277 GNUNET_free (cfe);
1278 return GNUNET_SYSERR;
1279 }
1280 }
1281
1286 if (OFFER_MESSAGE == mt)
1287 {
1288 mcfs = &cfe->offer;
1289 }
1290 else if (DEMAND_MESSAGE == mt)
1291 {
1292 mcfs = &cfe->demand;
1293 }
1294 else if (ELEMENT_MESSAGE == mt)
1295 {
1296 mcfs = &cfe->element;
1297 }
1298 else
1299 {
1300 return GNUNET_SYSERR;
1301 }
1302
1307 if (new_mcfs <= *mcfs)
1308 {
1309 return GNUNET_NO;
1310 }
1311
1312 *mcfs = new_mcfs;
1313 return GNUNET_YES;
1314}
1315
1316
1324static int
1327 struct GNUNET_HashCode *hash_code,
1328 enum MESSAGE_TYPE mt)
1329{
1330 struct messageControlFlowElement *cfe = NULL;
1331 enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1332
1333 cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1334
1339 if (cfe != NULL)
1340 {
1341 if (OFFER_MESSAGE == mt)
1342 {
1343 mcfs = &cfe->offer;
1344 }
1345 else if (DEMAND_MESSAGE == mt)
1346 {
1347 mcfs = &cfe->demand;
1348 }
1349 else if (ELEMENT_MESSAGE == mt)
1350 {
1351 mcfs = &cfe->element;
1352 }
1353 else
1354 {
1355 return GNUNET_SYSERR;
1356 }
1357
1361 if (*mcfs != MSG_CFS_UNINITIALIZED)
1362 {
1363 return GNUNET_YES;
1364 }
1365 }
1366 return GNUNET_NO;
1367}
1368
1369
1380static int
1382 const struct GNUNET_HashCode *key,
1383 void *value)
1384{
1385 struct messageControlFlowElement *mcfe = value;
1386
1387 if (((mcfe->element == MSG_CFS_SENT) || (mcfe->element == MSG_CFS_RECEIVED) ))
1388 {
1389 return GNUNET_YES;
1390 }
1391 return GNUNET_NO;
1392}
1393
1394
1404static int
1406 const struct GNUNET_HashCode *key,
1407 void *value)
1408{
1409 struct Operation *op = cls;
1410 struct GNUNET_SETU_Element *element = value;
1411 op->total_elements_size_local += element->size;
1412 return GNUNET_YES;
1413}
1414
1415
1425static int
1427 const struct GNUNET_HashCode *key,
1428 void *value)
1429{
1430 struct Operation *op = cls;
1431
1432 struct GNUNET_HashContext *hashed_key_context =
1434 struct GNUNET_HashCode new_key;
1435
1439 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1440 &key,
1441 sizeof(struct IBF_Key));
1442 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1443 &op->set->content->elements_randomized_salt,
1444 sizeof(uint32_t));
1445 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
1446 &new_key);
1447 GNUNET_CONTAINER_multihashmap_put (op->set->content->elements_randomized,
1448 &new_key,value,
1450 return GNUNET_YES;
1451}
1452
1453
1464static int
1466 uint32_t key,
1467 void *value)
1468{
1469 struct KeyEntry *k = value;
1470
1471 GNUNET_assert (NULL != k);
1472 if (GNUNET_YES == k->element->remote)
1473 {
1474 GNUNET_free (k->element);
1475 k->element = NULL;
1476 }
1477 GNUNET_free (k);
1478 return GNUNET_YES;
1479}
1480
1481
1488static void
1490{
1491 struct Operation *op = cls;
1492 struct GNUNET_MQ_Envelope *ev;
1493 struct GNUNET_SETU_ResultMessage *rm;
1494
1495 if (GNUNET_YES == op->client_done_sent)
1496 return;
1497 if (PHASE_FINISHED != op->phase)
1498 {
1500 "Union operation failed\n");
1502 "# Union operations failed",
1503 1,
1504 GNUNET_NO);
1507 rm->request_id = htonl (op->client_request_id);
1508 rm->element_type = htons (0);
1509 GNUNET_MQ_send (op->set->cs->mq,
1510 ev);
1511 return;
1512 }
1513
1514 op->client_done_sent = GNUNET_YES;
1515
1517 "# Union operations succeeded",
1518 1,
1519 GNUNET_NO);
1521 "Signalling client that union operation is done\n");
1522 ev = GNUNET_MQ_msg (rm,
1524 rm->request_id = htonl (op->client_request_id);
1526 rm->element_type = htons (0);
1528 op->key_to_element));
1529 GNUNET_MQ_send (op->set->cs->mq,
1530 ev);
1531}
1532
1533
1540static int
1542{
1543 if (op->byzantine != GNUNET_YES)
1544 return GNUNET_OK;
1545
1549 if (op->remote_element_count + op->remote_set_diff >
1550 op->byzantine_upper_bound)
1551 return GNUNET_SYSERR;
1552 if (op->local_element_count + op->local_set_diff > op->byzantine_upper_bound)
1553 return GNUNET_SYSERR;
1554
1558 if (op->remote_element_count < op->byzantine_lower_bound)
1559 return GNUNET_SYSERR;
1560 return GNUNET_OK;
1561}
1562
1563
1564static enum GNUNET_GenericReturnValue
1566 const struct GNUNET_HashCode *key,
1567 void *value)
1568{
1570 return GNUNET_YES;
1571}
1572
1573
1574/* FIXME: the destroy logic is a mess and should be cleaned up! */
1575
1588static void
1590{
1591 struct Set *set = op->set;
1592 struct GNUNET_CADET_Channel *channel;
1593
1595 "Destroying union operation %p\n",
1596 op);
1597 GNUNET_assert (NULL == op->listener);
1598 /* check if the op was canceled twice */
1599 if (NULL != op->remote_ibf)
1600 {
1601 ibf_destroy (op->remote_ibf);
1602 op->remote_ibf = NULL;
1603 }
1604 if (NULL != op->demanded_hashes)
1605 {
1606 GNUNET_CONTAINER_multihashmap_destroy (op->demanded_hashes);
1607 op->demanded_hashes = NULL;
1608 }
1609 if (NULL != op->local_ibf)
1610 {
1611 ibf_destroy (op->local_ibf);
1612 op->local_ibf = NULL;
1613 }
1614 if (NULL != op->se)
1615 {
1617 op->se = NULL;
1618 }
1619 if (NULL != op->key_to_element)
1620 {
1623 NULL);
1625 op->key_to_element = NULL;
1626 }
1627 if (NULL != op->message_control_flow)
1628 {
1629 GNUNET_CONTAINER_multihashmap_iterate (op->message_control_flow,
1631 NULL);
1632 GNUNET_CONTAINER_multihashmap_destroy (op->message_control_flow);
1633 op->message_control_flow = NULL;
1634 }
1635 if (NULL != op->inquiries_sent)
1636 {
1637 GNUNET_CONTAINER_multihashmap_destroy (op->inquiries_sent);
1638 op->inquiries_sent = NULL;
1639 }
1640 if (NULL != set)
1641 {
1643 set->ops_tail,
1644 op);
1645 op->set = NULL;
1646 }
1647 if (NULL != op->context_msg)
1648 {
1649 GNUNET_free (op->context_msg);
1650 op->context_msg = NULL;
1651 }
1652 if (NULL != (channel = op->channel))
1653 {
1654 /* This will free op; called conditionally as this helper function
1655 is also called from within the channel disconnect handler. */
1656 op->channel = NULL;
1658 }
1659 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
1660 * there was a channel end handler that will free 'op' on the call stack. */
1661}
1662
1663
1669static void
1671
1672
1678static void
1680{
1681 struct Listener *listener;
1682
1684 "Destroying incoming operation %p\n",
1685 op);
1686 if (NULL != (listener = op->listener))
1687 {
1689 listener->op_tail,
1690 op);
1691 op->listener = NULL;
1692 }
1693 if (NULL != op->timeout_task)
1694 {
1695 GNUNET_SCHEDULER_cancel (op->timeout_task);
1696 op->timeout_task = NULL;
1697 }
1699}
1700
1701
1707static void
1709{
1710 struct GNUNET_CADET_Channel *channel;
1711
1712 if (NULL != (channel = op->channel))
1713 {
1714 /* This will free op; called conditionally as this helper function
1715 is also called from within the channel disconnect handler. */
1716 op->channel = NULL;
1718 }
1719 if (NULL != op->listener)
1720 {
1722 return;
1723 }
1724 if (NULL != op->set)
1727 GNUNET_free (op);
1728}
1729
1730
1737static void
1739{
1740 struct GNUNET_MQ_Envelope *ev;
1742
1744 "union operation failed\n");
1746 msg->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
1747 msg->request_id = htonl (op->client_request_id);
1748 msg->element_type = htons (0);
1749 GNUNET_MQ_send (op->set->cs->mq,
1750 ev);
1752}
1753
1754
1765static void
1767{
1768 int security_level_lb = -1 * SECURITY_LEVEL;
1769 uint64_t duplicates = op->received_fresh - op->received_total;
1770
1771 if (GNUNET_YES != op->byzantine)
1772 return;
1773
1774 /*
1775 * Protect full sync from receiving double element when in FULL SENDING
1776 */
1777 if (PHASE_FULL_SENDING == op->phase)
1778 {
1779 if (duplicates > 0)
1780 {
1782 "PROTOCOL VIOLATION: Received duplicate element in full receiving "
1783 "mode of operation this is not allowed! Duplicates: %llu\n",
1784 (unsigned long long) duplicates);
1785 GNUNET_break_op (0);
1787 return;
1788 }
1789
1790 }
1791
1792 /*
1793 * Protect full sync with probabilistic algorithm
1794 */
1795 if (PHASE_FULL_RECEIVING == op->phase)
1796 {
1797
1798 long double base = (1 - (long double) (op->remote_set_diff
1799 / (long double) (op->initial_size
1800 + op->
1801 remote_set_diff)));
1802 long double exponent = (op->received_total - (op->received_fresh * ((long
1803 double)
1804 op->
1805 initial_size
1806 / (long
1807 double)
1808 op->
1809 remote_set_diff)));
1810 long double value = exponent * (log2l (base) / log2l (2));
1811 if (0 == op->remote_set_diff)
1812 op->remote_set_diff = 1;
1813 if ((value < security_level_lb) || (value > SECURITY_LEVEL) )
1814 {
1816 "PROTOCOL VIOLATION: Other peer violated probabilistic rule for receiving "
1817 "to many duplicated full element : %LF\n",
1818 value);
1819 GNUNET_break_op (0);
1821 return;
1822 }
1823 }
1824}
1825
1826
1831static void
1833{
1834 double probability = op->differential_sync_iterations * (log2l (
1836 / log2l (2));
1837 if ((-1 * SECURITY_LEVEL) > probability)
1838 {
1840 "PROTOCOL VIOLATION: Other peer violated probabilistic rule for to many active passive "
1841 "switches in differential sync: %u\n",
1842 op->differential_sync_iterations);
1843 GNUNET_break_op (0);
1845 return;
1846 }
1847}
1848
1849
1857static struct IBF_Key
1858get_ibf_key (const struct GNUNET_HashCode *src)
1859{
1860 struct IBF_Key key;
1861 uint16_t salt = 0;
1862
1864 GNUNET_CRYPTO_kdf (&key, sizeof(key),
1865 src, sizeof *src,
1866 &salt, sizeof(salt),
1867 NULL, 0));
1868 return key;
1869}
1870
1871
1875struct GetElementContext
1876{
1880 struct GNUNET_HashCode hash;
1881
1885 struct KeyEntry *k;
1886};
1887
1888
1899static int
1901 uint32_t key,
1902 void *value)
1903{
1904 struct GetElementContext *ctx = cls;
1905 struct KeyEntry *k = value;
1906
1907 GNUNET_assert (NULL != k);
1909 &ctx->hash))
1910 {
1911 ctx->k = k;
1912 return GNUNET_NO;
1913 }
1914 return GNUNET_YES;
1915}
1916
1917
1926static struct KeyEntry *
1928 const struct GNUNET_HashCode *element_hash)
1929{
1930 int ret;
1931 struct IBF_Key ibf_key;
1932 struct GetElementContext ctx = { { { 0 } }, 0 };
1933
1934 ctx.hash = *element_hash;
1935
1936 ibf_key = get_ibf_key (element_hash);
1938 (uint32_t) ibf_key.key_val
1939 ,
1941 &ctx);
1942
1943 /* was the iteration aborted because we found the element? */
1944 if (GNUNET_SYSERR == ret)
1945 {
1946 GNUNET_assert (NULL != ctx.k);
1947 return ctx.k;
1948 }
1949 return NULL;
1950}
1951
1952
1967static void
1969 struct ElementEntry *ee,
1970 int received)
1971{
1972 struct IBF_Key ibf_key;
1973 struct KeyEntry *k;
1974
1976 k = GNUNET_new (struct KeyEntry);
1977 k->element = ee;
1978 k->ibf_key = ibf_key;
1979 k->received = received;
1981 GNUNET_CONTAINER_multihashmap32_put (op->key_to_element,
1982 (uint32_t) ibf_key.key_val
1983 ,
1984 k,
1986}
1987
1988
1993static void
1994salt_key (const struct IBF_Key *k_in,
1995 uint32_t salt,
1996 struct IBF_Key *k_out)
1997{
1998 int s = (salt * 7) % 64;
1999 uint64_t x = k_in->key_val;
2000
2001 /* rotate ibf key */
2002 x = (x >> s) | (x << (64 - s));
2003 k_out->key_val = x;
2004}
2005
2006
2010static void
2011unsalt_key (const struct IBF_Key *k_in,
2012 uint32_t salt,
2013 struct IBF_Key *k_out)
2014{
2015 int s = (salt * 7) % 64;
2016 uint64_t x = k_in->key_val;
2017
2018 x = (x << s) | (x >> (64 - s));
2019 k_out->key_val = x;
2020}
2021
2022
2030static int
2032 uint32_t key,
2033 void *value)
2034{
2035 struct Operation *op = cls;
2036 struct KeyEntry *ke = value;
2037 struct IBF_Key salted_key;
2038
2040 "[OP %p] inserting %lx (hash %s) into ibf\n",
2041 op,
2042 (unsigned long) ke->ibf_key.key_val,
2044 salt_key (&ke->ibf_key,
2045 op->salt_send,
2046 &salted_key);
2047 ibf_insert (op->local_ibf, salted_key);
2048 return GNUNET_YES;
2049}
2050
2051
2059static int
2061 struct Operation *op)
2062{
2063 return ee->generation >= op->generation_created;
2064}
2065
2066
2077static int
2079 const struct GNUNET_HashCode *key,
2080 void *value)
2081{
2082 struct Operation *op = cls;
2083 struct ElementEntry *ee = value;
2084
2085 /* make sure that the element belongs to the set at the time
2086 * of creating the operation */
2087 if (GNUNET_NO ==
2089 op))
2090 return GNUNET_YES;
2093 ee,
2094 GNUNET_NO);
2095 return GNUNET_YES;
2096}
2097
2098
2104static void
2106{
2107 unsigned int len;
2108
2109 GNUNET_assert (NULL == op->key_to_element);
2110 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
2111 op->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
2112 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2114 op);
2115}
2116
2117
2126static int
2128 uint32_t size)
2129{
2130 GNUNET_assert (NULL != op->key_to_element);
2131
2132 if (NULL != op->local_ibf)
2133 ibf_destroy (op->local_ibf);
2134 // op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
2135 op->local_ibf = ibf_create (size,
2136 ((uint8_t) op->ibf_number_buckets_per_element));
2137 if (NULL == op->local_ibf)
2138 {
2140 "Failed to allocate local IBF\n");
2141 return GNUNET_SYSERR;
2142 }
2145 op);
2146 return GNUNET_OK;
2147}
2148
2149
2159static int
2161 uint32_t ibf_size)
2162{
2163 uint64_t buckets_sent = 0;
2164 struct InvertibleBloomFilter *ibf;
2165
2169 uint32_t ibf_min_size = IBF_MIN_SIZE;
2170
2171 op->differential_sync_iterations++;
2172 if (ibf_size < ibf_min_size)
2173 {
2174 ibf_size = ibf_min_size;
2175 }
2176 if (GNUNET_OK !=
2178 {
2179 /* allocation failed */
2180 return GNUNET_SYSERR;
2181 }
2182
2184 "sending ibf of size %u\n",
2185 (unsigned int) ibf_size);
2186
2187 {
2188 char name[64];
2189
2191 sizeof(name),
2192 "# sent IBF (order %u)",
2193 ibf_size);
2195 }
2196
2197 ibf = op->local_ibf;
2198
2199 while (buckets_sent < ibf_size)
2200 {
2201 unsigned int buckets_in_message;
2202 struct GNUNET_MQ_Envelope *ev;
2203 struct IBFMessage *msg;
2204
2205 buckets_in_message = ibf_size - buckets_sent;
2206 /* limit to maximum */
2207 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
2208 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
2209
2210#if MEASURE_PERFORMANCE
2211 perf_store.ibf.sent += 1;
2212 perf_store.ibf.sent_var_bytes += (buckets_in_message * IBF_BUCKET_SIZE);
2213#endif
2215 buckets_in_message * IBF_BUCKET_SIZE,
2217 msg->ibf_size = ibf_size;
2218 msg->offset = htonl (buckets_sent);
2219 msg->salt = htonl (op->salt_send);
2220 msg->ibf_counter_bit_length = ibf_get_max_counter (ibf);
2221
2222
2223 ibf_write_slice (ibf, buckets_sent,
2224 buckets_in_message, &msg[1], msg->ibf_counter_bit_length);
2225 buckets_sent += buckets_in_message;
2227 "ibf chunk size %u, %llu/%u sent\n",
2228 (unsigned int) buckets_in_message,
2229 (unsigned long long) buckets_sent,
2230 (unsigned int) ibf_size);
2231 GNUNET_MQ_send (op->mq, ev);
2232 }
2233
2234 /* The other peer must decode the IBF, so
2235 * we're passive. */
2236 op->phase = PHASE_PASSIVE_DECODING;
2237 return GNUNET_OK;
2238}
2239
2240
2248static unsigned int
2249get_size_from_difference (unsigned int diff, int number_buckets_per_element,
2250 float ibf_bucket_number_factor)
2251{
2254 return (((int) (diff * ibf_bucket_number_factor)) | 1);
2255
2256}
2257
2258
2259static unsigned int
2260get_next_ibf_size (float ibf_bucket_number_factor, unsigned int
2261 decoded_elements, unsigned int last_ibf_size)
2262{
2263 unsigned int next_size = (unsigned int) ((last_ibf_size * 2)
2264 - (ibf_bucket_number_factor
2265 * decoded_elements));
2268 return next_size | 1;
2269}
2270
2271
2281static int
2283 const struct GNUNET_HashCode *key,
2284 void *value)
2285{
2286 struct Operation *op = cls;
2287 struct GNUNET_SETU_ElementMessage *emsg;
2288 struct ElementEntry *ee = value;
2289 struct GNUNET_SETU_Element *el = &ee->element;
2290 struct GNUNET_MQ_Envelope *ev;
2291
2293 "Sending element %s\n",
2294 GNUNET_h2s (key));
2295#if MEASURE_PERFORMANCE
2296 perf_store.element_full.received += 1;
2297 perf_store.element_full.received_var_bytes += el->size;
2298#endif
2299 ev = GNUNET_MQ_msg_extra (emsg,
2300 el->size,
2302 emsg->element_type = htons (el->element_type);
2303 GNUNET_memcpy (&emsg[1],
2304 el->data,
2305 el->size);
2306 GNUNET_MQ_send (op->mq,
2307 ev);
2308 return GNUNET_YES;
2309}
2310
2311
2317static void
2319{
2320 struct GNUNET_MQ_Envelope *ev;
2321
2322 op->phase = PHASE_FULL_SENDING;
2324 "Dedicing to transmit the full set\n");
2325 /* FIXME: use a more memory-friendly way of doing this with an
2326 iterator, just as we do in the non-full case! */
2327
2328 // Randomize Elements to send
2329 op->set->content->elements_randomized = GNUNET_CONTAINER_multihashmap_create (
2330 32,GNUNET_NO);
2331 op->set->content->elements_randomized_salt = GNUNET_CRYPTO_random_u64 (
2333 UINT64_MAX);
2334 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2335 &
2337 op);
2338
2340 op->set->content->elements_randomized,
2342 op);
2343#if MEASURE_PERFORMANCE
2344 perf_store.full_done.sent += 1;
2345#endif
2347 GNUNET_MQ_send (op->mq,
2348 ev);
2349}
2350
2351
2358static int
2360 const struct StrataEstimatorMessage *msg)
2361{
2362 struct Operation *op = cls;
2363 int is_compressed;
2364 size_t len;
2365
2366 if (op->phase != PHASE_EXPECT_SE)
2367 {
2368 GNUNET_break (0);
2369 return GNUNET_SYSERR;
2370 }
2371 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
2372 msg->header.type));
2373 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
2374 if ((GNUNET_NO == is_compressed) &&
2376 {
2377 GNUNET_break (0);
2378 return GNUNET_SYSERR;
2379 }
2380 return GNUNET_OK;
2381}
2382
2383
2390static void
2392 const struct StrataEstimatorMessage *msg)
2393{
2394#if MEASURE_PERFORMANCE
2395 perf_store.se.received += 1;
2396 perf_store.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct
2398#endif
2399 struct Operation *op = cls;
2400 struct MultiStrataEstimator *remote_se;
2401 unsigned int diff;
2402 uint64_t other_size;
2403 size_t len;
2404 int is_compressed;
2405 uint8_t allowed_phases[] = {PHASE_EXPECT_SE};
2406 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
2407 op->set->content->elements);
2408 // Setting peer site to receiving peer
2409 op->peer_site = 1;
2410
2414 if (GNUNET_OK !=
2415 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2416 {
2417 GNUNET_break (0);
2419 return;
2420 }
2421
2423 if ((msg->se_count > 8) || (__builtin_popcount ((int) msg->se_count) != 1))
2424 {
2426 "PROTOCOL VIOLATION: Invalid number of se transmitted by other peer %u\n",
2427 msg->se_count);
2428 GNUNET_break_op (0);
2430 return;
2431 }
2432
2433 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
2434 msg->header.type));
2436 "# bytes of SE received",
2437 ntohs (msg->header.size),
2438 GNUNET_NO);
2439 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
2440 other_size = GNUNET_ntohll (msg->set_size);
2441 op->remote_element_count = other_size;
2442
2443 if (op->byzantine_upper_bound < op->remote_element_count)
2444 {
2446 "Exceeded configured upper bound <%" PRIu64 "> of element: %u\n",
2447 op->byzantine_upper_bound,
2448 op->remote_element_count);
2450 return;
2451 }
2452
2456 if (NULL == remote_se)
2457 {
2458 /* insufficient resources, fail */
2460 return;
2461 }
2462 if (GNUNET_OK !=
2464 len,
2465 is_compressed,
2466 msg->se_count,
2468 remote_se))
2469 {
2470 /* decompression failed */
2471 strata_estimator_destroy (remote_se);
2473 return;
2474 }
2475 GNUNET_assert (NULL != op->se);
2476 strata_estimator_difference (remote_se,
2477 op->se);
2478 {
2479 /* Calculate remote local diff */
2480 long diff_remote = remote_se->stratas[0]->strata[0]->remote_decoded_count;
2481 long diff_local = remote_se->stratas[0]->strata[0]->local_decoded_count;
2482 uint64_t avg_element_size = 0;
2483
2484 /* Prevent estimations from overshooting max element */
2485 if (diff_remote + op->remote_element_count > op->byzantine_upper_bound)
2486 diff_remote = op->byzantine_upper_bound - op->remote_element_count;
2487 if (diff_local + op->local_element_count > op->byzantine_upper_bound)
2488 diff_local = op->byzantine_upper_bound - op->local_element_count;
2489 if ((diff_remote < 0) || (diff_local < 0))
2490 {
2491 strata_estimator_destroy (remote_se);
2493 "PROTOCOL VIOLATION: More element is set as upper boundary or other peer is "
2494 "malicious: remote diff %ld, local diff: %ld\n",
2495 diff_remote, diff_local);
2496 GNUNET_break_op (0);
2498 return;
2499 }
2500 /* Make estimation more precise in initial sync cases */
2501 if (0 == op->remote_element_count)
2502 {
2503 diff_remote = 0;
2504 diff_local = op->local_element_count;
2505 }
2506 if (0 == op->local_element_count)
2507 {
2508 diff_local = 0;
2509 diff_remote = op->remote_element_count;
2510 }
2511
2512 diff = diff_remote + diff_local;
2513 op->remote_set_diff = diff_remote;
2514
2516 if (0 < op->local_element_count)
2517 {
2518 op->total_elements_size_local = 0;
2519 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2520 &
2522 op);
2523 avg_element_size = op->total_elements_size_local / op->local_element_count
2524 ;
2525 }
2526
2527 op->mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2529 (
2530 op->set->content
2531 ->
2532 elements),
2533 op->
2534 remote_element_count,
2535 diff_remote,
2536 diff_local,
2537 op->
2538 rtt_bandwidth_tradeoff,
2539 op->
2540 ibf_bucket_number_factor);
2541
2542#if MEASURE_PERFORMANCE
2543 perf_store.se_diff_local = diff_local;
2544 perf_store.se_diff_remote = diff_remote;
2545 perf_store.se_diff = diff;
2546 perf_store.mode_of_operation = op->mode_of_operation;
2547#endif
2548
2549 strata_estimator_destroy (remote_se);
2551 op->se = NULL;
2553 "got se diff=%d, using ibf size %d\n",
2554 diff,
2555 1U << get_size_from_difference (diff, op->
2556 ibf_number_buckets_per_element,
2557 op->ibf_bucket_number_factor));
2558
2559 {
2560 char *set_debug;
2561
2562 set_debug = getenv ("GNUNET_SETU_BENCHMARK");
2563 if ((NULL != set_debug) &&
2564 (0 == strcmp (set_debug, "1")))
2565 {
2566 FILE *f = fopen ("set.log", "a");
2567 fprintf (f, "%llu\n", (unsigned long long) diff);
2568 fclose (f);
2569 }
2570 }
2571
2572 if ((GNUNET_YES == op->byzantine) &&
2573 (other_size < op->byzantine_lower_bound))
2574 {
2575 GNUNET_break (0);
2577 return;
2578 }
2579
2580 if ((GNUNET_YES == op->force_full) ||
2581 (op->mode_of_operation != DIFFERENTIAL_SYNC))
2582 {
2584 "Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
2585 diff,
2586 (unsigned long long) op->initial_size);
2588 "# of full sends",
2589 1,
2590 GNUNET_NO);
2591 if (FULL_SYNC_LOCAL_SENDING_FIRST == op->mode_of_operation)
2592 {
2593 struct TransmitFullMessage *signal_msg;
2594 struct GNUNET_MQ_Envelope *ev;
2595 ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2597 signal_msg->remote_set_difference = htonl (diff_local);
2598 signal_msg->remote_set_size = htonl (op->local_element_count);
2599 signal_msg->local_set_difference = htonl (diff_remote);
2600 GNUNET_MQ_send (op->mq,
2601 ev);
2602 send_full_set (op);
2603 }
2604 else
2605 {
2606 struct GNUNET_MQ_Envelope *ev;
2607
2609 "Telling other peer that we expect its full set\n");
2610 op->phase = PHASE_FULL_RECEIVING;
2611#if MEASURE_PERFORMANCE
2612 perf_store.request_full.sent += 1;
2613#endif
2614 {
2615 struct TransmitFullMessage *signal_msg;
2616 ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct
2619 signal_msg->remote_set_difference = htonl (diff_local);
2620 signal_msg->remote_set_size = htonl (op->local_element_count);
2621 signal_msg->local_set_difference = htonl (diff_remote);
2622 GNUNET_MQ_send (op->mq,
2623 ev);
2624 }
2625 }
2626 }
2627 else
2628 {
2630 "# of ibf sends",
2631 1,
2632 GNUNET_NO);
2633 if (GNUNET_OK !=
2634 send_ibf (op,
2636 op->ibf_number_buckets_per_element
2637 ,
2638 op->ibf_bucket_number_factor)))
2639 {
2640 /* Internal error, best we can do is shut the connection */
2642 "Failed to send IBF, closing connection\n");
2644 return;
2645 }
2646 }
2647 GNUNET_CADET_receive_done (op->channel);
2648 }
2649}
2650
2651
2659static int
2661 uint32_t key,
2662 void *value)
2663{
2664 struct SendElementClosure *sec = cls;
2665 struct Operation *op = sec->op;
2666 struct KeyEntry *ke = value;
2667 struct GNUNET_MQ_Envelope *ev;
2668 struct GNUNET_MessageHeader *mh;
2669
2670 /* Detect 32-bit key collision for the 64-bit IBF keys. */
2671 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
2672 {
2673 op->active_passive_switch_required = true;
2674 return GNUNET_YES;
2675 }
2676
2677 /* Prevent implementation from sending a offer multiple times in case of roll switch */
2678 if (GNUNET_YES ==
2680 op->message_control_flow,
2681 &ke->element->element_hash,
2683 )
2684 {
2686 "Skipping already sent processed element offer!\n");
2687 return GNUNET_YES;
2688 }
2689
2690 /* Save send offer message for message control */
2691 if (GNUNET_YES !=
2693 op->message_control_flow,
2695 &ke->element->element_hash,
2697 )
2698 {
2700 "Double offer message sent found!\n");
2701 GNUNET_break (0);
2703 return GNUNET_NO;
2704 }
2705 ;
2706
2707 /* Mark element to be expected to received */
2708 if (GNUNET_YES !=
2710 op->message_control_flow,
2712 &ke->element->element_hash,
2714 )
2715 {
2717 "Double demand received found!\n");
2718 GNUNET_break (0);
2720 return GNUNET_NO;
2721 }
2722 ;
2723#if MEASURE_PERFORMANCE
2724 perf_store.offer.sent += 1;
2725 perf_store.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode);
2726#endif
2728 sizeof(struct GNUNET_HashCode),
2730 GNUNET_assert (NULL != ev);
2731 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
2733 "[OP %p] sending element offer (%s) to peer\n",
2734 op,
2736 GNUNET_MQ_send (op->mq, ev);
2737 return GNUNET_YES;
2738}
2739
2740
2747static void
2749 struct IBF_Key ibf_key)
2750{
2751 struct SendElementClosure send_cls;
2752
2753 send_cls.ibf_key = ibf_key;
2754 send_cls.op = op;
2756 op->key_to_element,
2757 (uint32_t) ibf_key.
2758 key_val,
2760 &send_cls);
2761}
2762
2763
2771static int
2773{
2774 struct IBF_Key key;
2775 struct IBF_Key last_key;
2776 int side;
2777 unsigned int num_decoded;
2778 struct InvertibleBloomFilter *diff_ibf;
2779
2781
2782 if (GNUNET_OK !=
2783 prepare_ibf (op,
2784 op->remote_ibf->size))
2785 {
2786 GNUNET_break (0);
2787 /* allocation failed */
2788 return GNUNET_SYSERR;
2789 }
2790
2791 diff_ibf = ibf_dup (op->local_ibf);
2792 ibf_subtract (diff_ibf,
2793 op->remote_ibf);
2794
2795 ibf_destroy (op->remote_ibf);
2796 op->remote_ibf = NULL;
2797
2799 "decoding IBF (size=%u)\n",
2800 diff_ibf->size);
2801
2802 num_decoded = 0;
2803 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
2804
2805 while (1)
2806 {
2807 int res;
2808 int cycle_detected = GNUNET_NO;
2809
2810 last_key = key;
2811
2812 res = ibf_decode (diff_ibf,
2813 &side,
2814 &key);
2815 if (res == GNUNET_OK)
2816 {
2818 "decoded ibf key %lx\n",
2819 (unsigned long) key.key_val);
2820 num_decoded += 1;
2821 if ((num_decoded > diff_ibf->size) ||
2822 ((num_decoded > 1) &&
2823 (last_key.key_val == key.key_val)))
2824 {
2826 "detected cyclic ibf (decoded %u/%u)\n",
2827 num_decoded,
2828 diff_ibf->size);
2829 cycle_detected = GNUNET_YES;
2830 }
2831 }
2832 if ((GNUNET_SYSERR == res) ||
2833 (GNUNET_YES == cycle_detected))
2834 {
2835 uint32_t next_size;
2838 next_size = get_next_ibf_size (op->ibf_bucket_number_factor, num_decoded,
2839 diff_ibf->size);
2842 {
2843 uint32_t ibf_min_size = IBF_MIN_SIZE | 1;
2844
2845 if (next_size<ibf_min_size)
2846 next_size = ibf_min_size;
2847 }
2848
2849 if (next_size <= MAX_IBF_SIZE)
2850 {
2852 "decoding failed, sending larger ibf (size %u)\n",
2853 next_size);
2855 "# of IBF retries",
2856 1,
2857 GNUNET_NO);
2858#if MEASURE_PERFORMANCE
2859 perf_store.active_passive_switches += 1;
2860#endif
2861
2862 op->salt_send = op->salt_receive++;
2863
2864 if (GNUNET_OK !=
2865 send_ibf (op, next_size))
2866 {
2867 /* Internal error, best we can do is shut the connection */
2869 "Failed to send IBF, closing connection\n");
2871 ibf_destroy (diff_ibf);
2872 return GNUNET_SYSERR;
2873 }
2874 }
2875 else
2876 {
2878 "# of failed union operations (too large)",
2879 1,
2880 GNUNET_NO);
2881 // XXX: Send the whole set, element-by-element
2883 "set union failed: reached ibf limit\n");
2885 ibf_destroy (diff_ibf);
2886 return GNUNET_SYSERR;
2887 }
2888 break;
2889 }
2890 if (GNUNET_NO == res)
2891 {
2892 struct GNUNET_MQ_Envelope *ev;
2893
2895 "transmitted all values, sending DONE\n");
2896
2897#if MEASURE_PERFORMANCE
2898 perf_store.done.sent += 1;
2899#endif
2901 GNUNET_MQ_send (op->mq, ev);
2902 /* We now wait until we get a DONE message back
2903 * and then wait for our MQ to be flushed and all our
2904 * demands be delivered. */
2905 break;
2906 }
2907 if (1 == side)
2908 {
2909 struct IBF_Key unsalted_key;
2910 unsalt_key (&key,
2911 op->salt_receive,
2912 &unsalted_key);
2914 unsalted_key);
2915 }
2916 else if (-1 == side)
2917 {
2918 struct GNUNET_MQ_Envelope *ev;
2919 struct InquiryMessage *msg;
2920
2921#if MEASURE_PERFORMANCE
2922 perf_store.inquery.sent += 1;
2923 perf_store.inquery.sent_var_bytes += sizeof(struct IBF_Key);
2924#endif
2925
2927 struct GNUNET_HashContext *hashed_key_context =
2929 struct GNUNET_HashCode *hashed_key = (struct
2931 sizeof(struct GNUNET_HashCode));
2933 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
2934 &key,
2935 sizeof(struct IBF_Key));
2936 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
2937 hashed_key);
2938 GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
2939 hashed_key,
2940 &mcfs,
2942 );
2943
2944 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
2945 * the effort additional complexity. */
2947 sizeof(struct IBF_Key),
2949 msg->salt = htonl (op->salt_receive);
2950 GNUNET_memcpy (&msg[1],
2951 &key,
2952 sizeof(struct IBF_Key));
2954 "sending element inquiry for IBF key %lx\n",
2955 (unsigned long) key.key_val);
2956 GNUNET_MQ_send (op->mq, ev);
2957 }
2958 else
2959 {
2960 GNUNET_assert (0);
2961 }
2962 }
2963 ibf_destroy (diff_ibf);
2964 return GNUNET_OK;
2965}
2966
2967
2975static int
2977 const struct TransmitFullMessage *msg)
2978{
2979 return GNUNET_OK;
2980}
2981
2982
2989static void
2991 const struct TransmitFullMessage *msg)
2992{
2993 struct Operation *op = cls;
2994
2998 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
2999 if (GNUNET_OK !=
3000 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3001 {
3002 GNUNET_break (0);
3004 return;
3005 }
3006
3008 op->remote_element_count = ntohl (msg->remote_set_size);
3009 op->remote_set_diff = ntohl (msg->remote_set_difference);
3010 op->local_set_diff = ntohl (msg->local_set_difference);
3011
3014 {
3016 "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
3017 "criteria\n");
3018 GNUNET_break_op (0);
3020 return;
3021 }
3022
3024 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
3025 op->set->content->elements);
3026 {
3027 uint64_t avg_element_size = 0;
3029 if (0 < op->local_element_count)
3030 {
3031 op->total_elements_size_local = 0;
3032 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
3033 &
3035 op);
3036 avg_element_size = op->total_elements_size_local / op->local_element_count
3037 ;
3038 }
3039
3042 op->
3044 op->
3046 op->local_set_diff,
3047 op->remote_set_diff
3048 ,
3049 op->
3051 op->
3054 {
3056 "PROTOCOL VIOLATION: Remote peer choose to send his full set first but correct mode would have been"
3057 " : %d\n", mode_of_operation);
3058 GNUNET_break_op (0);
3060 return;
3061 }
3062 op->phase = PHASE_FULL_RECEIVING;
3063 }
3064}
3065
3066
3077static int
3079 const struct IBFMessage *msg)
3080{
3081 struct Operation *op = cls;
3082 unsigned int buckets_in_message;
3083
3084 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
3086 if (0 == buckets_in_message)
3087 {
3088 GNUNET_break_op (0);
3089 return GNUNET_SYSERR;
3090 }
3091 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
3093 {
3094 GNUNET_break_op (0);
3095 return GNUNET_SYSERR;
3096 }
3097 if (op->phase == PHASE_EXPECT_IBF_LAST)
3098 {
3099 if (ntohl (msg->offset) != op->ibf_buckets_received)
3100 {
3101 GNUNET_break_op (0);
3102 return GNUNET_SYSERR;
3103 }
3104
3105 if (msg->ibf_size != op->remote_ibf->size)
3106 {
3107 GNUNET_break_op (0);
3108 return GNUNET_SYSERR;
3109 }
3110 if (ntohl (msg->salt) != op->salt_receive)
3111 {
3112 GNUNET_break_op (0);
3113 return GNUNET_SYSERR;
3114 }
3115 }
3116 else if ((op->phase != PHASE_PASSIVE_DECODING) &&
3117 (op->phase != PHASE_EXPECT_IBF))
3118 {
3119 GNUNET_break_op (0);
3120 return GNUNET_SYSERR;
3121 }
3122
3123 return GNUNET_OK;
3124}
3125
3126
3136static void
3138 const struct IBFMessage *msg)
3139{
3140 struct Operation *op = cls;
3141 unsigned int buckets_in_message;
3145 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF, PHASE_EXPECT_IBF_LAST,
3147 if (GNUNET_OK !=
3148 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3149 {
3150 GNUNET_break (0);
3152 return;
3153 }
3154 op->differential_sync_iterations++;
3156 op->active_passive_switch_required = false;
3157
3158#if MEASURE_PERFORMANCE
3159 perf_store.ibf.received += 1;
3160 perf_store.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg);
3161#endif
3162
3163 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
3165 if ((op->phase == PHASE_PASSIVE_DECODING) ||
3166 (op->phase == PHASE_EXPECT_IBF))
3167 {
3168 op->phase = PHASE_EXPECT_IBF_LAST;
3169 GNUNET_assert (NULL == op->remote_ibf);
3171 "Creating new ibf of size %u\n",
3172 ntohl (msg->ibf_size));
3173 // op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
3174 op->remote_ibf = ibf_create (msg->ibf_size,
3175 ((uint8_t) op->ibf_number_buckets_per_element))
3176 ;
3177 op->salt_receive = ntohl (msg->salt);
3179 "Receiving new IBF with salt %u\n",
3180 op->salt_receive);
3181 if (NULL == op->remote_ibf)
3182 {
3184 "Failed to parse remote IBF, closing connection\n");
3186 return;
3187 }
3188 op->ibf_buckets_received = 0;
3189 if (0 != ntohl (msg->offset))
3190 {
3191 GNUNET_break_op (0);
3193 return;
3194 }
3195 }
3196 else
3197 {
3200 "Received more of IBF\n");
3201 }
3202 GNUNET_assert (NULL != op->remote_ibf);
3203
3204 ibf_read_slice (&msg[1],
3205 op->ibf_buckets_received,
3206 buckets_in_message,
3207 op->remote_ibf, msg->ibf_counter_bit_length);
3208 op->ibf_buckets_received += buckets_in_message;
3209
3210 if (op->ibf_buckets_received == op->remote_ibf->size)
3211 {
3213 "received full ibf\n");
3214 op->phase = PHASE_ACTIVE_DECODING;
3215 if (GNUNET_OK !=
3217 {
3218 /* Internal error, best we can do is shut down */
3220 "Failed to decode IBF, closing connection\n");
3222 return;
3223 }
3224 }
3225 GNUNET_CADET_receive_done (op->channel);
3226}
3227
3228
3237static void
3239 const struct GNUNET_SETU_Element *element,
3241{
3242 struct GNUNET_MQ_Envelope *ev;
3243 struct GNUNET_SETU_ResultMessage *rm;
3244
3246 "sending element (size %u) to client\n",
3247 element->size);
3248 GNUNET_assert (0 != op->client_request_id);
3249 ev = GNUNET_MQ_msg_extra (rm,
3250 element->size,
3252 if (NULL == ev)
3253 {
3254 GNUNET_MQ_discard (ev);
3255 GNUNET_break (0);
3256 return;
3257 }
3258 rm->result_status = htons (status);
3259 rm->request_id = htonl (op->client_request_id);
3260 rm->element_type = htons (element->element_type);
3262 op->key_to_element));
3263 GNUNET_memcpy (&rm[1],
3264 element->data,
3265 element->size);
3266 GNUNET_MQ_send (op->set->cs->mq,
3267 ev);
3268}
3269
3270
3276static void
3278{
3279 unsigned int num_demanded;
3281 op->message_control_flow,
3282 &
3284 op);
3285 num_demanded = GNUNET_CONTAINER_multihashmap_size (
3286 op->demanded_hashes);
3287 if (PHASE_FINISH_WAITING == op->phase)
3288 {
3290 "In PHASE_FINISH_WAITING, pending %u demands -> %d\n",
3291 num_demanded, op->peer_site);
3292 if (-1 != send_done)
3293 {
3294 struct GNUNET_MQ_Envelope *ev;
3295
3296 op->phase = PHASE_FINISHED;
3297#if MEASURE_PERFORMANCE
3298 perf_store.done.sent += 1;
3299#endif
3301 GNUNET_MQ_send (op->mq,
3302 ev);
3303 /* We now wait until the other peer sends P2P_OVER
3304 * after it got all elements from us. */
3305 }
3306 }
3307 if (PHASE_FINISH_CLOSING == op->phase)
3308 {
3310 "In PHASE_FINISH_CLOSING, pending %u demands %d\n",
3311 num_demanded, op->peer_site);
3312 if (-1 != send_done)
3313 {
3314 op->phase = PHASE_FINISHED;
3317 }
3318 }
3319}
3320
3321
3328static int
3330 const struct GNUNET_SETU_ElementMessage *emsg)
3331{
3332 struct Operation *op = cls;
3333
3334 if (0 == GNUNET_CONTAINER_multihashmap_size (op->demanded_hashes))
3335 {
3336 GNUNET_break_op (0);
3337 return GNUNET_SYSERR;
3338 }
3339 return GNUNET_OK;
3340}
3341
3342
3351static void
3353 const struct GNUNET_SETU_ElementMessage *emsg)
3354{
3355 struct Operation *op = cls;
3356 struct ElementEntry *ee;
3357 struct KeyEntry *ke;
3358 uint16_t element_size;
3359
3363 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3365 if (GNUNET_OK !=
3366 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3367 {
3368 GNUNET_break (0);
3370 return;
3371 }
3372
3373 element_size = ntohs (emsg->header.size) - sizeof(struct
3375#if MEASURE_PERFORMANCE
3376 perf_store.element.received += 1;
3377 perf_store.element.received_var_bytes += element_size;
3378#endif
3379
3380 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
3381 GNUNET_memcpy (&ee[1],
3382 &emsg[1],
3383 element_size);
3384 ee->element.size = element_size;
3385 ee->element.data = &ee[1];
3386 ee->element.element_type = ntohs (emsg->element_type);
3387 ee->remote = GNUNET_YES;
3389 &ee->element_hash);
3390 if (GNUNET_NO ==
3391 GNUNET_CONTAINER_multihashmap_remove (op->demanded_hashes,
3392 &ee->element_hash,
3393 NULL))
3394 {
3395 /* We got something we didn't demand, since it's not in our map. */
3396 GNUNET_break_op (0);
3398 return;
3399 }
3400
3401 if (GNUNET_OK !=
3403 op->message_control_flow,
3405 &ee->element_hash,
3407 )
3408 {
3410 "An element has been received more than once!\n");
3411 GNUNET_break (0);
3413 return;
3414 }
3415
3417 "Got element (size %u, hash %s) from peer\n",
3418 (unsigned int) element_size,
3419 GNUNET_h2s (&ee->element_hash));
3420
3422 "# received elements",
3423 1,
3424 GNUNET_NO);
3426 "# exchanged elements",
3427 1,
3428 GNUNET_NO);
3429
3430 op->received_total++;
3431
3432 ke = op_get_element (op,
3433 &ee->element_hash);
3434 if (NULL != ke)
3435 {
3436 /* Got repeated element. Should not happen since
3437 * we track demands. */
3439 "# repeated elements",
3440 1,
3441 GNUNET_NO);
3442 ke->received = GNUNET_YES;
3443 GNUNET_free (ee);
3444 }
3445 else
3446 {
3448 "Registering new element from remote peer\n");
3449 op->received_fresh++;
3451 /* only send results immediately if the client wants it */
3453 &ee->element,
3455 }
3456
3457 if ((op->received_total > 8) &&
3458 (op->received_fresh < op->received_total / 3))
3459 {
3460 /* The other peer gave us lots of old elements, there's something wrong. */
3461 GNUNET_break_op (0);
3463 return;
3464 }
3465 GNUNET_CADET_receive_done (op->channel);
3466 maybe_finish (op);
3467}
3468
3469
3476static int
3478 const struct GNUNET_SETU_ElementMessage *emsg)
3479{
3480 struct Operation *op = cls;
3481
3482 (void) op;
3483
3484 // FIXME: check that we expect full elements here?
3485 return GNUNET_OK;
3486}
3487
3488
3495static void
3497 const struct GNUNET_SETU_ElementMessage *emsg)
3498{
3499 struct Operation *op = cls;
3500 struct ElementEntry *ee;
3501 struct KeyEntry *ke;
3502 uint16_t element_size;
3503
3507 uint8_t allowed_phases[] = {PHASE_FULL_RECEIVING, PHASE_FULL_SENDING};
3508 if (GNUNET_OK !=
3509 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3510 {
3511 GNUNET_break (0);
3513 return;
3514 }
3515
3516 element_size = ntohs (emsg->header.size)
3517 - sizeof(struct GNUNET_SETU_ElementMessage);
3518
3519#if MEASURE_PERFORMANCE
3520 perf_store.element_full.received += 1;
3521 perf_store.element_full.received_var_bytes += element_size;
3522#endif
3523
3524 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
3525 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
3526 ee->element.size = element_size;
3527 ee->element.data = &ee[1];
3528 ee->element.element_type = ntohs (emsg->element_type);
3529 ee->remote = GNUNET_YES;
3531 &ee->element_hash);
3533 "Got element (full diff, size %u, hash %s) from peer\n",
3534 (unsigned int) element_size,
3535 GNUNET_h2s (&ee->element_hash));
3536
3538 "# received elements",
3539 1,
3540 GNUNET_NO);
3542 "# exchanged elements",
3543 1,
3544 GNUNET_NO);
3545
3546 op->received_total++;
3547 ke = op_get_element (op,
3548 &ee->element_hash);
3549 if (NULL != ke)
3550 {
3552 "# repeated elements",
3553 1,
3554 GNUNET_NO);
3556 ke->received = GNUNET_YES;
3557 GNUNET_free (ee);
3558 }
3559 else
3560 {
3562 "Registering new element from remote peer\n");
3563 op->received_fresh++;
3565 /* only send results immediately if the client wants it */
3567 &ee->element,
3569 }
3570
3571
3572 if ((GNUNET_YES == op->byzantine) &&
3573 (op->received_total > op->remote_element_count) )
3574 {
3575 /* The other peer gave us lots of old elements, there's something wrong. */
3577 "Other peer sent %llu elements while pretending to have %llu elements, failing operation\n",
3578 (unsigned long long) op->received_total,
3579 (unsigned long long) op->remote_element_count);
3580 GNUNET_break_op (0);
3582 return;
3583 }
3584 GNUNET_CADET_receive_done (op->channel);
3585}
3586
3587
3595static int
3597 const struct InquiryMessage *msg)
3598{
3599 struct Operation *op = cls;
3600 unsigned int num_keys;
3601
3602 if (op->phase != PHASE_PASSIVE_DECODING)
3603 {
3604 GNUNET_break_op (0);
3605 return GNUNET_SYSERR;
3606 }
3607 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3608 / sizeof(struct IBF_Key);
3609 if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3610 != num_keys * sizeof(struct IBF_Key))
3611 {
3612 GNUNET_break_op (0);
3613 return GNUNET_SYSERR;
3614 }
3615 return GNUNET_OK;
3616}
3617
3618
3625static void
3627 const struct InquiryMessage *msg)
3628{
3629 struct Operation *op = cls;
3630 const struct IBF_Key *ibf_key;
3631 unsigned int num_keys;
3632
3636 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
3637 if (GNUNET_OK !=
3638 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3639 {
3640 GNUNET_break (0);
3642 return;
3643 }
3644
3645#if MEASURE_PERFORMANCE
3646 perf_store.inquery.received += 1;
3647 perf_store.inquery.received_var_bytes += (ntohs (msg->header.size)
3648 - sizeof(struct InquiryMessage));
3649#endif
3650
3652 "Received union inquiry\n");
3653 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3654 / sizeof(struct IBF_Key);
3655 ibf_key = (const struct IBF_Key *) &msg[1];
3656 {
3658 struct GNUNET_HashContext *hashed_key_context =
3660 struct GNUNET_HashCode *hashed_key = GNUNET_new (struct GNUNET_HashCode);
3662 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
3663 &ibf_key,
3664 sizeof(struct IBF_Key));
3665 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
3666 hashed_key);
3667 GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
3668 hashed_key,
3669 &mcfs,
3671 );
3672
3673 while (0 != num_keys--)
3674 {
3675 struct IBF_Key unsalted_key;
3676 unsalt_key (ibf_key,
3677 ntohl (msg->salt),
3678 &unsalted_key);
3680 unsalted_key);
3681 ibf_key++;
3682 }
3683 GNUNET_CADET_receive_done (op->channel);
3684 }
3685}
3686
3687
3698static int
3700 uint32_t key,
3701 void *value)
3702{
3703 struct Operation *op = cls;
3704 struct KeyEntry *ke = value;
3705 struct GNUNET_MQ_Envelope *ev;
3706 struct GNUNET_SETU_ElementMessage *emsg;
3707 struct ElementEntry *ee = ke->element;
3708
3709 if (GNUNET_YES == ke->received)
3710 return GNUNET_YES;
3711#if MEASURE_PERFORMANCE
3712 perf_store.element_full.received += 1;
3713#endif
3714 ev = GNUNET_MQ_msg_extra (emsg,
3715 ee->element.size,
3717 GNUNET_memcpy (&emsg[1],
3718 ee->element.data,
3719 ee->element.size);
3720 emsg->element_type = htons (ee->element.element_type);
3721 GNUNET_MQ_send (op->mq,
3722 ev);
3723 return GNUNET_YES;
3724}
3725
3726
3733static int
3735 const struct TransmitFullMessage *mh)
3736{
3737 return GNUNET_OK;
3738}
3739
3740
3741static void
3743 const struct TransmitFullMessage *msg)
3744{
3745 struct Operation *op = cls;
3746 uint64_t avg_element_size = 0;
3747 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
3748
3752 if (GNUNET_OK !=
3753 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3754 {
3755 GNUNET_break (0);
3757 return;
3758 }
3759
3760 op->remote_element_count = ntohl (msg->remote_set_size);
3761 op->remote_set_diff = ntohl (msg->remote_set_difference);
3762 op->local_set_diff = ntohl (msg->local_set_difference);
3763
3764
3766 {
3768 "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
3769 "criteria\n");
3770 GNUNET_break_op (0);
3772 return;
3773 }
3774
3775#if MEASURE_PERFORMANCE
3776 perf_store.request_full.received += 1;
3777#endif
3778
3780 "Received request for full set transmission\n");
3781
3783 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
3784 op->set->content->elements);
3785 if (0 < op->local_element_count)
3786 {
3787 op->total_elements_size_local = 0;
3788 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
3789 &
3791 op);
3792 avg_element_size = op->total_elements_size_local / op->local_element_count;
3793 }
3794 {
3795 int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
3796 op->
3798 op->
3800 op->local_set_diff,
3801 op->remote_set_diff
3802 ,
3803 op->
3805 op->
3808 {
3810 "PROTOCOL VIOLATION: Remote peer choose to request the full set first but correct mode would have been"
3811 " : %d\n", mode_of_operation);
3812 GNUNET_break_op (0);
3814 return;
3815 }
3816 }
3817 // FIXME: we need to check that our set is larger than the
3818 // byzantine_lower_bound by some threshold
3819 send_full_set (op);
3820 GNUNET_CADET_receive_done (op->channel);
3821}
3822
3823
3830static void
3832 const struct GNUNET_MessageHeader *mh)
3833{
3834 struct Operation *op = cls;
3835
3839 uint8_t allowed_phases[] = {PHASE_FULL_SENDING, PHASE_FULL_RECEIVING};
3840 if (GNUNET_OK !=
3841 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3842 {
3843 GNUNET_break (0);
3845 return;
3846 }
3847
3848#if MEASURE_PERFORMANCE
3849 perf_store.full_done.received += 1;
3850#endif
3851
3852 switch (op->phase)
3853 {
3855 {
3856 struct GNUNET_MQ_Envelope *ev;
3857
3858 if ((GNUNET_YES == op->byzantine) &&
3859 (op->received_total != op->remote_element_count) )
3860 {
3861 /* The other peer gave not enough elements before sending full done, there's something wrong. */
3863 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
3864 (unsigned long long) op->received_total,
3865 (unsigned long long) op->remote_element_count);
3866 GNUNET_break_op (0);
3868 return;
3869 }
3870
3872 "got FULL DONE, sending elements that other peer is missing\n");
3873
3874 /* send all the elements that did not come from the remote peer */
3877 op);
3878#if MEASURE_PERFORMANCE
3879 perf_store.full_done.sent += 1;
3880#endif
3882 GNUNET_MQ_send (op->mq,
3883 ev);
3884 op->phase = PHASE_FINISHED;
3885 /* we now wait until the other peer sends us the OVER message*/
3886 }
3887 break;
3888
3889 case PHASE_FULL_SENDING:
3890 {
3892 "got FULL DONE, finishing\n");
3893 /* We sent the full set, and got the response for that. We're done. */
3894 op->phase = PHASE_FINISHED;
3895 GNUNET_CADET_receive_done (op->channel);
3898 return;
3899 }
3900
3901 default:
3903 "Handle full done phase is %u\n",
3904 (unsigned) op->phase);
3905 GNUNET_break_op (0);
3907 return;
3908 }
3909 GNUNET_CADET_receive_done (op->channel);
3910}
3911
3912
3921static int
3923 const struct GNUNET_MessageHeader *mh)
3924{
3925 struct Operation *op = cls;
3926 unsigned int num_hashes;
3927
3928 (void) op;
3929 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3930 / sizeof(struct GNUNET_HashCode);
3931 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3932 != num_hashes * sizeof(struct GNUNET_HashCode))
3933 {
3934 GNUNET_break_op (0);
3935 return GNUNET_SYSERR;
3936 }
3937 return GNUNET_OK;
3938}
3939
3940
3948static void
3950 const struct GNUNET_MessageHeader *mh)
3951{
3952 struct Operation *op = cls;
3953 struct ElementEntry *ee;
3954 struct GNUNET_SETU_ElementMessage *emsg;
3955 const struct GNUNET_HashCode *hash;
3956 unsigned int num_hashes;
3957 struct GNUNET_MQ_Envelope *ev;
3958
3962 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3964 if (GNUNET_OK !=
3965 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3966 {
3967 GNUNET_break (0);
3969 return;
3970 }
3971#if MEASURE_PERFORMANCE
3972 perf_store.demand.received += 1;
3973 perf_store.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct
3975#endif
3976
3977 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3978 / sizeof(struct GNUNET_HashCode);
3979 for (hash = (const struct GNUNET_HashCode *) &mh[1];
3980 num_hashes > 0;
3981 hash++, num_hashes--)
3982 {
3983 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
3984 hash);
3985 if (NULL == ee)
3986 {
3987 /* Demand for non-existing element. */
3988 GNUNET_break_op (0);
3990 return;
3991 }
3992
3993 /* Save send demand message for message control */
3994 if (GNUNET_YES !=
3996 op->message_control_flow,
3998 &ee->element_hash,
4000 )
4001 {
4003 "Double demand message received found!\n");
4004 GNUNET_break (0);
4006 return;
4007 }
4008 ;
4009
4010 /* Mark element to be expected to received */
4011 if (GNUNET_YES !=
4013 op->message_control_flow,
4015 &ee->element_hash,
4017 )
4018 {
4020 "Double element message sent found!\n");
4021 GNUNET_break (0);
4023 return;
4024 }
4026 {
4027 /* Probably confused lazily copied sets. */
4028 GNUNET_break_op (0);
4030 return;
4031 }
4032#if MEASURE_PERFORMANCE
4033 perf_store.element.sent += 1;
4034 perf_store.element.sent_var_bytes += ee->element.size;
4035#endif
4036 ev = GNUNET_MQ_msg_extra (emsg,
4037 ee->element.size,
4039 GNUNET_memcpy (&emsg[1],
4040 ee->element.data,
4041 ee->element.size);
4042 emsg->reserved = htons (0);
4043 emsg->element_type = htons (ee->element.element_type);
4045 "[OP %p] Sending demanded element (size %u, hash %s) to peer\n",
4046 op,
4047 (unsigned int) ee->element.size,
4048 GNUNET_h2s (&ee->element_hash));
4049 GNUNET_MQ_send (op->mq, ev);
4051 "# exchanged elements",
4052 1,
4053 GNUNET_NO);
4054 if (op->symmetric)
4056 &ee->element,
4058 }
4059 GNUNET_CADET_receive_done (op->channel);
4060 maybe_finish (op);
4061}
4062
4063
4071static int
4073 const struct GNUNET_MessageHeader *mh)
4074{
4075 struct Operation *op = cls;
4076 unsigned int num_hashes;
4077
4078 /* look up elements and send them */
4079 if ((op->phase != PHASE_PASSIVE_DECODING) &&
4080 (op->phase != PHASE_ACTIVE_DECODING))
4081 {
4082 GNUNET_break_op (0);
4083 return GNUNET_SYSERR;
4084 }
4085 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
4086 / sizeof(struct GNUNET_HashCode);
4087 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
4088 num_hashes * sizeof(struct GNUNET_HashCode))
4089 {
4090 GNUNET_break_op (0);
4091 return GNUNET_SYSERR;
4092 }
4093 return GNUNET_OK;
4094}
4095
4096
4104static void
4106 const struct GNUNET_MessageHeader *mh)
4107{
4108 struct Operation *op = cls;
4109 const struct GNUNET_HashCode *hash;
4110 unsigned int num_hashes;
4114 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4115 if (GNUNET_OK !=
4116 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4117 {
4118 GNUNET_break (0);
4120 return;
4121 }
4122
4123#if MEASURE_PERFORMANCE
4124 perf_store.offer.received += 1;
4125 perf_store.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct
4127#endif
4128
4129 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
4130 / sizeof(struct GNUNET_HashCode);
4131 for (hash = (const struct GNUNET_HashCode *) &mh[1];
4132 num_hashes > 0;
4133 hash++, num_hashes--)
4134 {
4135 struct ElementEntry *ee;
4136 struct GNUNET_MessageHeader *demands;
4137 struct GNUNET_MQ_Envelope *ev;
4138
4139 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
4140 hash);
4141 if (NULL != ee)
4143 continue;
4144
4145 if (GNUNET_YES ==
4147 hash))
4148 {
4150 "Skipped sending duplicate demand\n");
4151 continue;
4152 }
4153
4156 op->demanded_hashes,
4157 hash,
4158 NULL,
4160
4162 "[OP %p] Requesting element (hash %s)\n",
4163 op, GNUNET_h2s (hash));
4164
4165#if MEASURE_PERFORMANCE
4166 perf_store.demand.sent += 1;
4167 perf_store.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode);
4168#endif
4169 /* Save send demand message for message control */
4170 if (GNUNET_YES !=
4172 op->message_control_flow,
4174 hash,
4176 {
4178 "Double demand message sent found!\n");
4179 GNUNET_break (0);
4181 return;
4182 }
4183
4184 /* Mark offer as received received */
4185 if (GNUNET_YES !=
4187 op->message_control_flow,
4189 hash,
4191 {
4193 "Double offer message received found!\n");
4194 GNUNET_break (0);
4196 return;
4197 }
4198 /* Mark element to be expected to received */
4199 if (GNUNET_YES !=
4201 op->message_control_flow,
4203 hash,
4205 {
4207 "Element already expected!\n");
4208 GNUNET_break (0);
4210 return;
4211 }
4212 ev = GNUNET_MQ_msg_header_extra (demands,
4213 sizeof(struct GNUNET_HashCode),
4215 GNUNET_memcpy (&demands[1],
4216 hash,
4217 sizeof(struct GNUNET_HashCode));
4218 GNUNET_MQ_send (op->mq, ev);
4219 }
4220 GNUNET_CADET_receive_done (op->channel);
4221}
4222
4223
4230static void
4232 const struct GNUNET_MessageHeader *mh)
4233{
4234 struct Operation *op = cls;
4235
4239 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4240 if (GNUNET_OK !=
4241 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4242 {
4243 GNUNET_break (0);
4245 return;
4246 }
4247
4248 if (op->active_passive_switch_required)
4249 {
4251 "PROTOCOL VIOLATION: Received done but role change is necessary\n");
4252 GNUNET_break (0);
4254 return;
4255 }
4256
4257#if MEASURE_PERFORMANCE
4258 perf_store.done.received += 1;
4259#endif
4260 switch (op->phase)
4261 {
4263 /* We got all requests, but still have to send our elements in response. */
4264 op->phase = PHASE_FINISH_WAITING;
4266 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
4267 /* The active peer is done sending offers
4268 * and inquiries. This means that all
4269 * our responses to that (demands and offers)
4270 * must be in flight (queued or in mesh).
4271 *
4272 * We should notify the active peer once
4273 * all our demands are satisfied, so that the active
4274 * peer can quit if we gave it everything.
4275 */GNUNET_CADET_receive_done (op->channel);
4276 maybe_finish (op);
4277 return;
4280 "got DONE (as active partner), waiting to finish\n");
4281 /* All demands of the other peer are satisfied,
4282 * and we processed all offers, thus we know
4283 * exactly what our demands must be.
4284 *
4285 * We'll close the channel
4286 * to the other peer once our demands are met.
4287 */op->phase = PHASE_FINISH_CLOSING;
4288 GNUNET_CADET_receive_done (op->channel);
4289 maybe_finish (op);
4290 return;
4291 default:
4292 GNUNET_break_op (0);
4294 return;
4295 }
4296}
4297
4298
4305static void
4307 const struct GNUNET_MessageHeader *mh)
4308{
4309#if MEASURE_PERFORMANCE
4310 perf_store.over.received += 1;
4311#endif
4312 send_client_done (cls);
4313}
4314
4315
4324static struct Operation *
4325get_incoming (uint32_t id)
4326{
4327 for (struct Listener *listener = listener_head;
4328 NULL != listener;
4330 {
4331 for (struct Operation *op = listener->op_head;
4332 NULL != op;
4333 op = op->next)
4334 if (op->suggest_id == id)
4335 return op;
4336 }
4337 return NULL;
4338}
4339
4340
4349static void *
4351 struct GNUNET_SERVICE_Client *c,
4352 struct GNUNET_MQ_Handle *mq)
4353{
4354 struct ClientState *cs;
4355
4356 num_clients++;
4357 cs = GNUNET_new (struct ClientState);
4358 cs->client = c;
4359 cs->mq = mq;
4360 return cs;
4361}
4362
4363
4372static int
4374 const struct GNUNET_HashCode *key,
4375 void *value)
4376{
4377 struct ElementEntry *ee = value;
4378
4379 GNUNET_free (ee);
4380 return GNUNET_YES;
4381}
4382
4383
4391static void
4393 struct GNUNET_SERVICE_Client *client,
4394 void *internal_cls)
4395{
4396 struct ClientState *cs = internal_cls;
4397 struct Operation *op;
4398 struct Listener *listener;
4399 struct Set *set;
4400
4402 "Client disconnected, cleaning up\n");
4403 if (NULL != (set = cs->set))
4404 {
4405 struct SetContent *content = set->content;
4406
4408 "Destroying client's set\n");
4409 /* Destroy pending set operations */
4410 while (NULL != set->ops_head)
4412
4413 /* Destroy operation-specific state */
4414 if (NULL != set->se)
4415 {
4417 set->se = NULL;
4418 }
4419 /* free set content (or at least decrement RC) */
4420 set->content = NULL;
4421 GNUNET_assert (0 != content->refcount);
4422 content->refcount--;
4423 if (0 == content->refcount)
4424 {
4425 GNUNET_assert (NULL != content->elements);
4428 NULL);
4430 content->elements = NULL;
4431 GNUNET_free (content);
4432 }
4433 GNUNET_free (set);
4434 }
4435
4436 if (NULL != (listener = cs->listener))
4437 {
4439 "Destroying client's listener\n");
4441 listener->open_port = NULL;
4442 while (NULL != (op = listener->op_head))
4443 {
4445 "Destroying incoming operation `%u' from peer `%s'\n",
4446 (unsigned int) op->client_request_id,
4447 GNUNET_i2s (&op->peer));
4449 }
4452 listener);
4453 GNUNET_free (listener);
4454 }
4455 GNUNET_free (cs);
4456 num_clients--;
4457 if ( (GNUNET_YES == in_shutdown) &&
4458 (0 == num_clients) )
4459 {
4460 if (NULL != cadet)
4461 {
4463 cadet = NULL;
4464 }
4465 }
4466}
4467
4468
4477static int
4479 const struct OperationRequestMessage *msg)
4480{
4481 struct Operation *op = cls;
4482 struct Listener *listener = op->listener;
4483 const struct GNUNET_MessageHeader *nested_context;
4484
4485 /* double operation request */
4486 if (0 != op->suggest_id)
4487 {
4488 GNUNET_break_op (0);
4489 return GNUNET_SYSERR;
4490 }
4491 /* This should be equivalent to the previous condition, but can't hurt to check twice */
4492 if (NULL == listener)
4493 {
4494 GNUNET_break (0);
4495 return GNUNET_SYSERR;
4496 }
4497 nested_context = GNUNET_MQ_extract_nested_mh (msg);
4498 if ((NULL != nested_context) &&
4499 (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
4500 {
4501 GNUNET_break_op (0);
4502 return GNUNET_SYSERR;
4503 }
4504 return GNUNET_OK;
4505}
4506
4507
4523static void
4525 const struct OperationRequestMessage *msg)
4526{
4527 struct Operation *op = cls;
4528 struct Listener *listener = op->listener;
4529 const struct GNUNET_MessageHeader *nested_context;
4530 struct GNUNET_MQ_Envelope *env;
4531 struct GNUNET_SETU_RequestMessage *cmsg;
4532
4533 nested_context = GNUNET_MQ_extract_nested_mh (msg);
4534 /* Make a copy of the nested_context (application-specific context
4535 information that is opaque to set) so we can pass it to the
4536 listener later on */
4537 if (NULL != nested_context)
4538 op->context_msg = GNUNET_copy_message (nested_context);
4539 op->remote_element_count = ntohl (msg->element_count);
4540 GNUNET_log (
4542 "Received P2P operation request (port %s) for active listener\n",
4543 GNUNET_h2s (&op->listener->app_id));
4544 GNUNET_assert (0 == op->suggest_id);
4545 if (0 == suggest_id)
4546 suggest_id++;
4547 op->suggest_id = suggest_id++;
4548 GNUNET_assert (NULL != op->timeout_task);
4549 GNUNET_SCHEDULER_cancel (op->timeout_task);
4550 op->timeout_task = NULL;
4553 op->context_msg);
4554 GNUNET_log (
4556 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
4557 op->suggest_id,
4558 listener,
4559 listener->cs);
4560 cmsg->accept_id = htonl (op->suggest_id);
4561 cmsg->peer_id = op->peer;
4562 GNUNET_MQ_send (listener->cs->mq,
4563 env);
4564 /* NOTE: GNUNET_CADET_receive_done() will be called in
4565 #handle_client_accept() */
4566}
4567
4568
4577static void
4579 const struct GNUNET_SETU_CreateMessage *msg)
4580{
4581 struct ClientState *cs = cls;
4582 struct Set *set;
4583
4585 "Client created new set for union operation\n");
4586 if (NULL != cs->set)
4587 {
4588 /* There can only be one set per client */
4589 GNUNET_break (0);
4591 return;
4592 }
4593 set = GNUNET_new (struct Set);
4594 {
4595 struct MultiStrataEstimator *se;
4596
4600 if (NULL == se)
4601 {
4603 "Failed to allocate strata estimator\n");
4604 GNUNET_free (set);
4606 return;
4607 }
4608 set->se = se;
4609 }
4610 set->content = GNUNET_new (struct SetContent);
4611 set->content->refcount = 1;
4613 GNUNET_YES);
4614 set->cs = cs;
4615 cs->set = set;
4617}
4618
4619
4629static void
4631{
4632 struct Operation *op = cls;
4633
4634 op->timeout_task = NULL;
4636 "Remote peer's incoming request timed out\n");
4638}
4639
4640
4657static void *
4660 const struct GNUNET_PeerIdentity *source)
4661{
4662 struct Listener *listener = cls;
4663 struct Operation *op;
4664
4666 "New incoming channel\n");
4667 op = GNUNET_new (struct Operation);
4668 op->listener = listener;
4669 op->peer = *source;
4670 op->channel = channel;
4671 op->mq = GNUNET_CADET_get_mq (op->channel);
4673 UINT32_MAX);
4676 op);
4679 op);
4680 return op;
4681}
4682
4683
4700static void
4701channel_end_cb (void *channel_ctx,
4702 const struct GNUNET_CADET_Channel *channel)
4703{
4704 struct Operation *op = channel_ctx;
4705
4706 op->channel = NULL;
4708}
4709
4710
4725static void
4727 const struct GNUNET_CADET_Channel *channel,
4728 int window_size)
4729{
4730 /* FIXME: not implemented, we could do flow control here... */
4731}
4732
4733
4741static void
4743 const struct GNUNET_SETU_ListenMessage *msg)
4744{
4745 struct ClientState *cs = cls;
4746 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4747 GNUNET_MQ_hd_var_size (incoming_msg,
4750 NULL),
4751 GNUNET_MQ_hd_var_size (union_p2p_ibf,
4753 struct IBFMessage,
4754 NULL),
4755 GNUNET_MQ_hd_var_size (union_p2p_elements,
4758 NULL),
4759 GNUNET_MQ_hd_var_size (union_p2p_offer,
4761 struct GNUNET_MessageHeader,
4762 NULL),
4763 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
4765 struct InquiryMessage,
4766 NULL),
4767 GNUNET_MQ_hd_var_size (union_p2p_demand,
4769 struct GNUNET_MessageHeader,
4770 NULL),
4771 GNUNET_MQ_hd_fixed_size (union_p2p_done,
4773 struct GNUNET_MessageHeader,
4774 NULL),
4775 GNUNET_MQ_hd_fixed_size (union_p2p_over,
4777 struct GNUNET_MessageHeader,
4778 NULL),
4779 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
4781 struct GNUNET_MessageHeader,
4782 NULL),
4783 GNUNET_MQ_hd_var_size (union_p2p_request_full,
4785 struct TransmitFullMessage,
4786 NULL),
4787 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4790 NULL),
4791 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4794 NULL),
4795 GNUNET_MQ_hd_var_size (union_p2p_full_element,
4798 NULL),
4799 GNUNET_MQ_hd_var_size (union_p2p_send_full,
4801 struct TransmitFullMessage,
4802 NULL),
4804 };
4805 struct Listener *listener;
4806
4807 if (NULL != cs->listener)
4808 {
4809 /* max. one active listener per client! */
4810 GNUNET_break (0);
4812 return;
4813 }
4814 listener = GNUNET_new (struct Listener);
4815 listener->cs = cs;
4816 cs->listener = listener;
4817 listener->app_id = msg->app_id;
4820 listener);
4822 "New listener created (port %s)\n",
4823 GNUNET_h2s (&listener->app_id));
4825 &msg->app_id,
4827 listener,
4830 cadet_handlers);
4832}
4833
4834
4842static void
4844 const struct GNUNET_SETU_RejectMessage *msg)
4845{
4846 struct ClientState *cs = cls;
4847 struct Operation *op;
4848
4849 op = get_incoming (ntohl (msg->accept_reject_id));
4850 if (NULL == op)
4851 {
4852 /* no matching incoming operation for this reject;
4853 could be that the other peer already disconnected... */
4855 "Client rejected unknown operation %u\n",
4856 (unsigned int) ntohl (msg->accept_reject_id));
4858 return;
4859 }
4861 "Peer request (app %s) rejected by client\n",
4862 GNUNET_h2s (&cs->listener->app_id));
4865}
4866
4867
4874static int
4876 const struct GNUNET_SETU_ElementMessage *msg)
4877{
4878 /* NOTE: Technically, we should probably check with the
4879 block library whether the element we are given is well-formed */
4880 return GNUNET_OK;
4881}
4882
4883
4890static void
4892 const struct GNUNET_SETU_ElementMessage *msg)
4893{
4894 struct ClientState *cs = cls;
4895 struct Set *set;
4896 struct GNUNET_SETU_Element el;
4897 struct ElementEntry *ee;
4898 struct GNUNET_HashCode hash;
4899
4900 if (NULL == (set = cs->set))
4901 {
4902 /* client without a set requested an operation */
4903 GNUNET_break (0);
4905 return;
4906 }
4908 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
4909 el.size = ntohs (msg->header.size) - sizeof(*msg);
4910 el.data = &msg[1];
4911 el.element_type = ntohs (msg->element_type);
4913 &hash);
4915 &hash);
4916 if (NULL == ee)
4917 {
4919 "Client inserts element %s of size %u\n",
4920 GNUNET_h2s (&hash),
4921 el.size);
4922 ee = GNUNET_malloc (el.size + sizeof(*ee));
4923 ee->element.size = el.size;
4924 GNUNET_memcpy (&ee[1], el.data, el.size);
4925 ee->element.data = &ee[1];
4926 ee->element.element_type = el.element_type;
4927 ee->remote = GNUNET_NO;
4928 ee->generation = set->current_generation;
4929 ee->element_hash = hash;
4932 set->content->elements,
4933 &ee->element_hash,
4934 ee,
4936 }
4937 else
4938 {
4940 "Client inserted element %s of size %u twice (ignored)\n",
4941 GNUNET_h2s (&hash),
4942 el.size);
4943 /* same element inserted twice */
4944 return;
4945 }
4947 get_ibf_key (&ee->element_hash));
4948}
4949
4950
4957static void
4959{
4960 set->content->latest_generation++;
4961 set->current_generation++;
4962}
4963
4964
4974static int
4976 const struct GNUNET_SETU_EvaluateMessage *msg)
4977{
4978 /* FIXME: suboptimal, even if the context below could be NULL,
4979 there are malformed messages this does not check for... */
4980 return GNUNET_OK;
4981}
4982
4983
4992static void
4994 const struct GNUNET_SETU_EvaluateMessage *msg)
4995{
4996 struct ClientState *cs = cls;
4997 struct Operation *op = GNUNET_new (struct Operation);
4998
4999 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
5000 GNUNET_MQ_hd_var_size (incoming_msg,
5003 op),
5004 GNUNET_MQ_hd_var_size (union_p2p_ibf,
5006 struct IBFMessage,
5007 op),
5008 GNUNET_MQ_hd_var_size (union_p2p_elements,
5011 op),
5012 GNUNET_MQ_hd_var_size (union_p2p_offer,
5014 struct GNUNET_MessageHeader,
5015 op),
5016 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
5018 struct InquiryMessage,
5019 op),
5020 GNUNET_MQ_hd_var_size (union_p2p_demand,
5022 struct GNUNET_MessageHeader,
5023 op),
5024 GNUNET_MQ_hd_fixed_size (union_p2p_done,
5026 struct GNUNET_MessageHeader,
5027 op),
5028 GNUNET_MQ_hd_fixed_size (union_p2p_over,
5030 struct GNUNET_MessageHeader,
5031 op),
5032 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
5034 struct GNUNET_MessageHeader,
5035 op),
5036 GNUNET_MQ_hd_var_size (union_p2p_request_full,
5038 struct TransmitFullMessage,
5039 op),
5040 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
5043 op),
5044 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
5047 op),
5048 GNUNET_MQ_hd_var_size (union_p2p_full_element,
5051 op),
5052 GNUNET_MQ_hd_var_size (union_p2p_send_full,
5054 struct TransmitFullMessage,
5055 NULL),
5057 };
5058 struct Set *set;
5059 const struct GNUNET_MessageHeader *context;
5060
5061 if (NULL == (set = cs->set))
5062 {
5063 GNUNET_break (0);
5064 GNUNET_free (op);
5066 return;
5067 }
5069 UINT32_MAX);
5070 op->peer = msg->target_peer;
5071 op->client_request_id = ntohl (msg->request_id);
5072 op->byzantine = msg->byzantine;
5073 op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
5074 op->force_full = msg->force_full;
5075 op->force_delta = msg->force_delta;
5076 op->symmetric = msg->symmetric;
5077 op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5078 op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5079 op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5080 op->byzantine_upper_bound = msg->byzantine_upper_bond;
5081 op->active_passive_switch_required = false;
5083
5084 /* create hashmap for message control */
5085 op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5086 GNUNET_NO);
5087 op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5088
5089#if MEASURE_PERFORMANCE
5090 /* load config */
5091 load_config (op);
5092#endif
5093
5094 /* Advance generation values, so that
5095 mutations won't interfere with the running operation. */
5096 op->set = set;
5097 op->generation_created = set->current_generation;
5098 advance_generation (set);
5100 set->ops_tail,
5101 op);
5103 "Creating new CADET channel to port %s for set union\n",
5104 GNUNET_h2s (&msg->app_id));
5106 op,
5107 &msg->target_peer,
5108 &msg->app_id,
5111 cadet_handlers);
5112 op->mq = GNUNET_CADET_get_mq (op->channel);
5113 {
5114 struct GNUNET_MQ_Envelope *ev;
5115 struct OperationRequestMessage *msg_tmp;
5116
5117#if MEASURE_PERFORMANCE
5118 perf_store.operation_request.sent += 1;
5119#endif
5120 ev = GNUNET_MQ_msg_nested_mh (msg_tmp,
5122 context);
5123 if (NULL == ev)
5124 {
5125 /* the context message is too large */
5126 GNUNET_break (0);
5128 return;
5129 }
5130 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
5131 GNUNET_NO);
5132 /* copy the current generation's strata estimator for this operation */
5133 op->se = strata_estimator_dup (op->set->se);
5134 /* we started the operation, thus we have to send the operation request */
5135 op->phase = PHASE_EXPECT_SE;
5136
5137 op->salt_receive = (op->peer_site + 1) % 2;
5138 op->salt_send = op->peer_site; // FIXME?????
5139
5140
5142 "Initiating union operation evaluation\n");
5144 "# of total union operations",
5145 1,
5146 GNUNET_NO);
5148 "# of initiated union operations",
5149 1,
5150 GNUNET_NO);
5151 GNUNET_MQ_send (op->mq,
5152 ev);
5153 if (NULL != context)
5155 "sent op request with context message\n");
5156 else
5158 "sent op request without context message\n");
5161 op->key_to_element);
5162
5163 }
5165}
5166
5167
5174static void
5176 const struct GNUNET_SETU_CancelMessage *msg)
5177{
5178 struct ClientState *cs = cls;
5179 struct Set *set;
5180 struct Operation *op;
5181 int found;
5182
5183 if (NULL == (set = cs->set))
5184 {
5185 /* client without a set requested an operation */
5186 GNUNET_break (0);
5188 return;
5189 }
5190 found = GNUNET_NO;
5191 for (op = set->ops_head; NULL != op; op = op->next)
5192 {
5193 if (op->client_request_id == ntohl (msg->request_id))
5194 {
5195 found = GNUNET_YES;
5196 break;
5197 }
5198 }
5199 if (GNUNET_NO == found)
5200 {
5201 /* It may happen that the operation was already destroyed due to
5202 * the other peer disconnecting. The client may not know about this
5203 * yet and try to cancel the (just barely non-existent) operation.
5204 * So this is not a hard error.
5205 *///
5207 "Client canceled non-existent op %u\n",
5208 (uint32_t) ntohl (msg->request_id));
5209 }
5210 else
5211 {
5213 "Client requested cancel for op %u\n",
5214 (uint32_t) ntohl (msg->request_id));
5216 }
5218}
5219
5220
5229static void
5231 const struct GNUNET_SETU_AcceptMessage *msg)
5232{
5233 struct ClientState *cs = cls;
5234 struct Set *set;
5235 struct Operation *op;
5236 struct GNUNET_SETU_ResultMessage *result_message;
5237 struct GNUNET_MQ_Envelope *ev;
5238 struct Listener *listener;
5239
5240 if (NULL == (set = cs->set))
5241 {
5242 /* client without a set requested to accept */
5243 GNUNET_break (0);
5245 return;
5246 }
5247 op = get_incoming (ntohl (msg->accept_reject_id));
5248 if (NULL == op)
5249 {
5250 /* It is not an error if the set op does not exist -- it may
5251 * have been destroyed when the partner peer disconnected. */
5252 GNUNET_log (
5254 "Client %p accepted request %u of listener %p that is no longer active\n",
5255 cs,
5256 ntohl (msg->accept_reject_id),
5257 cs->listener);
5258 ev = GNUNET_MQ_msg (result_message,
5260 result_message->request_id = msg->request_id;
5261 result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
5262 GNUNET_MQ_send (set->cs->mq, ev);
5264 return;
5265 }
5267 "Client accepting request %u\n",
5268 (uint32_t) ntohl (msg->accept_reject_id));
5269 listener = op->listener;
5270 op->listener = NULL;
5272 listener->op_tail,
5273 op);
5274 op->set = set;
5276 set->ops_tail,
5277 op);
5278 op->client_request_id = ntohl (msg->request_id);
5279 op->byzantine = msg->byzantine;
5280 op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
5281 op->force_full = msg->force_full;
5282 op->force_delta = msg->force_delta;
5283 op->symmetric = msg->symmetric;
5284 op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5285 op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5286 op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5287 op->byzantine_upper_bound = msg->byzantine_upper_bond;
5288 op->active_passive_switch_required = false;
5289 /* create hashmap for message control */
5290 op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5291 GNUNET_NO);
5292 op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5293
5294#if MEASURE_PERFORMANCE
5295 /* load config */
5296 load_config (op);
5297#endif
5298
5299 /* Advance generation values, so that future mutations do not
5300 interfere with the running operation. */
5301 op->generation_created = set->current_generation;
5302 advance_generation (set);
5303 GNUNET_assert (NULL == op->se);
5304
5306 "accepting set union operation\n");
5308 "# of accepted union operations",
5309 1,
5310 GNUNET_NO);
5312 "# of total union operations",
5313 1,
5314 GNUNET_NO);
5315 {
5316 struct MultiStrataEstimator *se;
5317 struct GNUNET_MQ_Envelope *ev_tmp;
5318 struct StrataEstimatorMessage *strata_msg;
5319 char *buf;
5320 size_t len;
5321 uint16_t type;
5322
5323 op->se = strata_estimator_dup (op->set->se);
5324 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
5325 GNUNET_NO);
5326 op->salt_receive = (op->peer_site + 1) % 2;
5327 op->salt_send = op->peer_site; // FIXME?????
5330 op->key_to_element);
5331
5332 /* kick off the operation */
5333 se = op->se;
5334
5335 {
5336 uint8_t se_count = 1;
5337 if (op->initial_size > 0)
5338 {
5339 op->total_elements_size_local = 0;
5340 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
5341 &
5343 op);
5345 op->total_elements_size_local / op->initial_size,
5346 op->initial_size);
5347 }
5349 * ((SE_IBFS_TOTAL_SIZE / 8) * se_count));
5350 len = strata_estimator_write (se,
5352 se_count,
5353 buf);
5354#if MEASURE_PERFORMANCE
5355 perf_store.se.sent += 1;
5356 perf_store.se.sent_var_bytes += len;
5357#endif
5358
5359 if (len < se->stratas[0]->strata_count * IBF_BUCKET_SIZE
5362 else
5364 ev_tmp = GNUNET_MQ_msg_extra (strata_msg,
5365 len,
5366 type);
5367 GNUNET_memcpy (&strata_msg[1],
5368 buf,
5369 len);
5370 GNUNET_free (buf);
5371 strata_msg->set_size
5373 op->set->content->elements));
5374 strata_msg->se_count = se_count;
5375 GNUNET_MQ_send (op->mq,
5376 ev_tmp);
5377 op->phase = PHASE_EXPECT_IBF;
5378 }
5379 }
5380 /* Now allow CADET to continue, as we did not do this in
5381 #handle_incoming_msg (as we wanted to first see if the
5382 local client would accept the request). */
5383 GNUNET_CADET_receive_done (op->channel);
5385}
5386
5387
5393static void
5394shutdown_task (void *cls)
5395{
5396 /* Delay actual shutdown to allow service to disconnect clients */
5398 if (0 == num_clients)
5399 {
5400 if (NULL != cadet)
5401 {
5403 cadet = NULL;
5404 }
5405 }
5407 GNUNET_YES);
5409 "handled shutdown request\n");
5410#if MEASURE_PERFORMANCE
5411 calculate_perf_store ();
5412#endif
5413}
5414
5415
5424static void
5425run (void *cls,
5426 const struct GNUNET_CONFIGURATION_Handle *cfg,
5428{
5429 /* FIXME: need to modify SERVICE (!) API to allow
5430 us to run a shutdown task *after* clients were
5431 forcefully disconnected! */
5433 NULL);
5435 cfg);
5437 if (NULL == cadet)
5438 {
5440 _ ("Could not connect to CADET service\n"));
5442 return;
5443 }
5444}
5445
5446
5452 "set",
5454 &run,
5457 NULL,
5458 GNUNET_MQ_hd_fixed_size (client_accept,
5461 NULL),
5462 GNUNET_MQ_hd_var_size (client_set_add,
5465 NULL),
5466 GNUNET_MQ_hd_fixed_size (client_create_set,
5469 NULL),
5470 GNUNET_MQ_hd_var_size (client_evaluate,
5473 NULL),
5474 GNUNET_MQ_hd_fixed_size (client_listen,
5477 NULL),
5478 GNUNET_MQ_hd_fixed_size (client_reject,
5481 NULL),
5482 GNUNET_MQ_hd_fixed_size (client_cancel,
5485 NULL),
5487
5488
5489/* end of gnunet-service-setu.c */
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
void ibf_subtract(struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2)
Subtract ibf2 from ibf1, storing the result in ibf1.
Definition: ibf.c:357
int ibf_decode(struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *ret_id)
Decode and remove an element from the IBF, if possible.
Definition: ibf.c:229
void ibf_read_slice(const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf)
Read buckets from a buffer into an ibf.
Definition: ibf.c:324
void ibf_write_slice(const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf)
Write buckets from an ibf to a buffer.
Definition: ibf.c:291
void ibf_insert(struct InvertibleBloomFilter *ibf, struct IBF_Key key)
Insert a key into an IBF.
Definition: ibf.c:168
struct InvertibleBloomFilter * ibf_dup(const struct InvertibleBloomFilter *ibf)
Create a copy of an IBF, the copy has to be destroyed properly.
Definition: ibf.c:380
struct InvertibleBloomFilter * ibf_create(uint32_t size, uint8_t hash_num)
Create an invertible bloom filter.
Definition: ibf.c:80
void ibf_destroy(struct InvertibleBloomFilter *ibf)
Destroy all resources associated with the invertible bloom filter.
Definition: ibf.c:404
#define IBF_BUCKET_SIZE
Size of one ibf bucket in bytes.
Definition: ibf.h:72
char * getenv()
static struct GNUNET_ARM_Operation * op
Current operation.
Definition: gnunet-arm.c:143
static int ret
Final status code.
Definition: gnunet-arm.c:93
static struct GNUNET_CONFIGURATION_Handle * cfg
Our configuration.
Definition: gnunet-arm.c:108
static unsigned int phase
Processing stage that we are in.
Definition: gnunet-arm.c:113
static struct GNUNET_CADET_Handle * mh
Cadet handle.
Definition: gnunet-cadet.c:92
struct GNUNET_HashCode key
The key used in the DHT.
static struct GNUNET_FS_Handle * ctx
static GstElement * source
Appsrc instance into which we write data for the pipeline.
static pa_context * context
Pulseaudio context.
static char * name
Name (label) of the records to list.
static struct GNUNET_IDENTITY_EgoLookup * el
Handle to identity lookup.
static char * res
Currently read line or NULL on EOF.
static char * value
Value of the record to add/remove.
static uint32_t type
Type string converted to DNS type value.
static int status
The program status; 0 for success.
Definition: gnunet-nse.c:39
static struct GNUNET_CRYPTO_PowSalt salt
Salt for PoW calculations.
Definition: gnunet-scrypt.c:34
static struct GNUNET_SERVICE_Handle * service
Handle to our service instance.
UnionOperationPhase
Current phase we are in for a union operation.
unsigned int strata_estimator_difference(const struct StrataEstimator *se1, const struct StrataEstimator *se2)
Estimate set difference with two strata estimators, i.e.
struct StrataEstimator * strata_estimator_create(unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum)
Create a new strata estimator with the given parameters.
void strata_estimator_destroy(struct StrataEstimator *se)
Destroy a strata estimator, free all of its resources.
int strata_estimator_read(const void *buf, size_t buf_len, int is_compressed, struct StrataEstimator *se)
Read strata from the buffer into the given strata estimator.
size_t strata_estimator_write(const struct StrataEstimator *se, void *buf)
Write the given strata estimator to the buffer.
void strata_estimator_insert(struct StrataEstimator *se, struct IBF_Key key)
Add a key to the strata estimator.
struct StrataEstimator * strata_estimator_dup(struct StrataEstimator *se)
Make a copy of a strata estimator.
static void _GSS_operation_destroy(struct Operation *op)
Destroy the given operation.
#define IBF_MIN_SIZE
Minimal size of an ibf Based on the bsc thesis of Elias Summermatter (2021)
static void _GSS_operation_destroy2(struct Operation *op)
This function probably should not exist and be replaced by inlining more specific logic in the variou...
static void handle_client_create_set(void *cls, const struct GNUNET_SETU_CreateMessage *msg)
Called when a client wants to create a new set.
static int _GSS_is_element_of_operation(struct ElementEntry *ee, struct Operation *op)
Is element ee part of the set used by op?
static int check_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
static unsigned int get_next_ibf_size(float ibf_bucket_number_factor, unsigned int decoded_elements, unsigned int last_ibf_size)
#define MAX_IBF_SIZE
The maximum size of an ibf we use is MAX_IBF_SIZE=2^20.
#define MAX_BUCKETS_PER_MESSAGE
Number of buckets that can be transmitted in one message.
static uint32_t suggest_id
Counter for allocating unique IDs for clients, used to identify incoming operation requests from remo...
static void handle_union_p2p_request_full(void *cls, const struct TransmitFullMessage *msg)
static int check_union_p2p_request_full(void *cls, const struct TransmitFullMessage *mh)
Handle a request for full set transmission.
static int create_randomized_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Create randomized element hashmap for full sending.
static int decode_and_send(struct Operation *op)
Decode which elements are missing on each side, and send the appropriate offers and inquiries.
static int send_full_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Send a set element.
MESSAGE_CONTROL_FLOW_STATE
Different states to control the messages flow in differential mode.
@ MSG_CFS_EXPECTED
Track that receiving this message is expected.
@ MSG_CFS_SENT
Track that a message has been sent.
@ MSG_CFS_UNINITIALIZED
Initial message state.
@ MSG_CFS_RECEIVED
Track that message has been received.
static struct Listener * listener_head
Listeners are held in a doubly linked list.
static void send_full_set(struct Operation *op)
Switch to full set transmission for op.
#define PROBABILITY_FOR_NEW_ROUND
Is the estimated probability for a new round this values is based on the bsc thesis of Elias Summerma...
static int check_byzantine_bounds(struct Operation *op)
Check if all given byzantine parameters are in given boundaries.
static void handle_client_accept(void *cls, const struct GNUNET_SETU_AcceptMessage *msg)
Handle a request from the client to accept a set operation that came from a remote peer.
static void handle_union_p2p_over(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a over message from a remote peer.
static void handle_client_cancel(void *cls, const struct GNUNET_SETU_CancelMessage *msg)
Handle a request from the client to cancel a running set operation.
static void handle_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
static void handle_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
static void handle_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
static int check_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Check an IBF message from a remote peer.
static struct IBF_Key get_ibf_key(const struct GNUNET_HashCode *src)
Derive the IBF key from a hash code and a salt.
static struct GNUNET_CADET_Handle * cadet
Handle to the cadet service, used to listen for and connect to remote peers.
static void maybe_finish(struct Operation *op)
Tests if the operation is finished, and if so notify.
static void handle_client_reject(void *cls, const struct GNUNET_SETU_RejectMessage *msg)
Called when the listening client rejects an operation request by another peer.
static int init_key_to_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for initializing the key-to-element mapping of a union operation.
static int send_offers_iterator(void *cls, uint32_t key, void *value)
Iterator to send elements to a remote peer.
static void handle_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
static int check_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
static int check_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
static int send_missing_full_elements_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries.
static unsigned int get_size_from_difference(unsigned int diff, int number_buckets_per_element, float ibf_bucket_number_factor)
Compute the necessary order of an ibf from the size of the symmetric set difference.
static void handle_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Handle a request for a set operation from another peer.
static int in_shutdown
Are we in shutdown? if GNUNET_YES and the number of clients drops to zero, disconnect from CADET.
static int destroy_elements_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator over hash map entries to free element entries.
static void shutdown_task(void *cls)
Called to clean up, after a shutdown has been requested.
static void handle_union_p2p_send_full(void *cls, const struct TransmitFullMessage *msg)
Handle send full message received from other peer.
static void handle_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
static struct GNUNET_STATISTICS_Handle * _GSS_statistics
Statistics handle.
static unsigned int num_clients
Number of active clients.
static void channel_window_cb(void *cls, const struct GNUNET_CADET_Channel *channel, int window_size)
Function called whenever an MQ-channel's transmission window size changes.
static void advance_generation(struct Set *set)
Advance the current generation of a set, adding exclusion ranges if necessary.
static int update_message_control_flow(struct GNUNET_CONTAINER_MultiHashMap *hash_map, enum MESSAGE_CONTROL_FLOW_STATE new_mcfs, const struct GNUNET_HashCode *hash_code, enum MESSAGE_TYPE mt)
Function to update, track and validate message received in differential sync.
static void check_max_differential_rounds(struct Operation *op)
Limit active passive switches in differential sync to configured security level.
static void handle_union_p2p_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a done message from a remote peer.
static int check_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Check a request for a set operation from another peer.
static void fail_union_operation(struct Operation *op)
Inform the client that the union operation has failed, and proceed to destroy the evaluate operation.
static int check_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Check offer (of struct GNUNET_HashCodes).
static void send_client_done(void *cls)
Signal to the client that the operation has finished and destroy the operation.
static void unsalt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
Reverse modification done in the salt_key function.
static void handle_union_p2p_full_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a "full done" message.
static void salt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
Modify an IBF key k_in based on the salt, returning a salted key in k_out.
static int prepare_ibf(struct Operation *op, uint32_t size)
Create an ibf with the operation's elements of the specified size.
static void op_register_element(struct Operation *op, struct ElementEntry *ee, int received)
Insert an element into the union operation's key-to-element mapping.
static void incoming_destroy(struct Operation *op)
Destroy an incoming request from a remote peer.
static struct Listener * listener_tail
Listeners are held in a doubly linked list.
static int determinate_avg_element_size_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for determining average size.
static int check_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check a full element message from a remote peer.
MESSAGE_TYPE
Message types to track in message control flow.
@ DEMAND_MESSAGE
Demand message type.
@ OFFER_MESSAGE
Offer message type.
@ ELEMENT_MESSAGE
Element message type.
static enum GNUNET_GenericReturnValue check_valid_phase(const uint8_t allowed_phases[], size_t size_phases, struct Operation *op)
Validates the if a message is received in a correct phase.
static void handle_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Handle offers (of struct GNUNET_HashCodes) and respond with demands (of struct GNUNET_HashCodes).
static void initialize_key_to_element(struct Operation *op)
Initialize the IBF key to element mapping local to this set operation.
static int check_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Check a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
static enum GNUNET_GenericReturnValue free_values_iter(void *cls, const struct GNUNET_HashCode *key, void *value)
static void send_offers_for_key(struct Operation *op, struct IBF_Key ibf_key)
Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
static int destroy_key_to_element_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries.
static void handle_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Handle an IBF message from a remote peer.
static void * channel_new_cb(void *cls, struct GNUNET_CADET_Channel *channel, const struct GNUNET_PeerIdentity *source)
Method called whenever another peer has added us to a channel the other peer initiated.
static int check_union_p2p_send_full(void *cls, const struct TransmitFullMessage *msg)
Check send full message received from other peer.
static void handle_client_listen(void *cls, const struct GNUNET_SETU_ListenMessage *msg)
Called when a client wants to create a new listener.
#define SE_STRATA_COUNT
Number of IBFs in a strata estimator.
static struct Operation * get_incoming(uint32_t id)
Get the incoming socket associated with the given id.
static void full_sync_plausibility_check(struct Operation *op)
Function that checks if full sync is plausible.
static void * client_connect_cb(void *cls, struct GNUNET_SERVICE_Client *c, struct GNUNET_MQ_Handle *mq)
Callback called when a client connects to the service.
GNUNET_SERVICE_MAIN(GNUNET_OS_project_data_gnunet(), "set", GNUNET_SERVICE_OPTION_NONE, &run, &client_connect_cb, &client_disconnect_cb, NULL, GNUNET_MQ_hd_fixed_size(client_accept, GNUNET_MESSAGE_TYPE_SETU_ACCEPT, struct GNUNET_SETU_AcceptMessage, NULL), GNUNET_MQ_hd_var_size(client_set_add, GNUNET_MESSAGE_TYPE_SETU_ADD, struct GNUNET_SETU_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size(client_create_set, GNUNET_MESSAGE_TYPE_SETU_CREATE, struct GNUNET_SETU_CreateMessage, NULL), GNUNET_MQ_hd_var_size(client_evaluate, GNUNET_MESSAGE_TYPE_SETU_EVALUATE, struct GNUNET_SETU_EvaluateMessage, NULL), GNUNET_MQ_hd_fixed_size(client_listen, GNUNET_MESSAGE_TYPE_SETU_LISTEN, struct GNUNET_SETU_ListenMessage, NULL), GNUNET_MQ_hd_fixed_size(client_reject, GNUNET_MESSAGE_TYPE_SETU_REJECT, struct GNUNET_SETU_RejectMessage, NULL), GNUNET_MQ_hd_fixed_size(client_cancel, GNUNET_MESSAGE_TYPE_SETU_CANCEL, struct GNUNET_SETU_CancelMessage, NULL), GNUNET_MQ_handler_end())
Define "main" method using service macro.
static void handle_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
#define INCOMING_CHANNEL_TIMEOUT
How long do we hold on to an incoming channel if there is no local listener before giving up?
static void handle_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
#define LOG(kind,...)
static int is_message_in_message_control_flow(struct GNUNET_CONTAINER_MultiHashMap *hash_map, struct GNUNET_HashCode *hash_code, enum MESSAGE_TYPE mt)
Validate if a message in differential sync si already received before.
#define SECURITY_LEVEL
Security level used for byzantine checks (2^80)
static int send_ibf(struct Operation *op, uint32_t ibf_size)
Send an ibf of appropriate size.
MODE_OF_OPERATION
Different modes of operations.
@ DIFFERENTIAL_SYNC
Mode just synchronizes the difference between sets.
@ FULL_SYNC_LOCAL_SENDING_FIRST
Mode send full set sending local set first.
@ FULL_SYNC_REMOTE_SENDING_FIRST
Mode request full set from remote peer.
static struct KeyEntry * op_get_element(struct Operation *op, const struct GNUNET_HashCode *element_hash)
Determine whether the given element is already in the operation's element set.
static void client_disconnect_cb(void *cls, struct GNUNET_SERVICE_Client *client, void *internal_cls)
Clean up after a client has disconnected.
@ PHASE_FINISH_CLOSING
The protocol is almost finished, but we still have to flush our message queue and/or expect some elem...
@ PHASE_EXPECT_SE
We sent the request message, and expect a strata estimator.
@ PHASE_EXPECT_IBF_LAST
Continuation for multi part IBFs.
@ PHASE_FULL_RECEIVING
Phase that receives full set first and then sends elements that are the local peer missing.
@ PHASE_FINISH_WAITING
In the penultimate phase, we wait until all our demands are satisfied.
@ PHASE_FINISHED
In the ultimate phase, we wait until our demands are satisfied and then quit (sending another DONE me...
@ PHASE_PASSIVE_DECODING
The other peer is decoding the IBF we just sent.
@ PHASE_ACTIVE_DECODING
We are decoding an IBF.
@ PHASE_FULL_SENDING
After sending the full set, wait for responses with the elements that the local peer is missing.
@ PHASE_EXPECT_IBF
We sent the strata estimator, and expect an IBF.
static uint8_t estimate_best_mode_of_operation(uint64_t avg_element_size, uint64_t local_set_size, uint64_t remote_set_size, uint64_t est_set_diff_remote, uint64_t est_set_diff_local, uint64_t bandwith_latency_tradeoff, uint64_t ibf_bucket_number_factor)
Function that chooses the optimal mode of operation depending on operation parameters.
static void channel_end_cb(void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
Function called whenever a channel is destroyed.
static void send_client_element(struct Operation *op, const struct GNUNET_SETU_Element *element, enum GNUNET_SETU_Status status)
Send a result message to the client indicating that there is a new element.
static int determinate_done_message_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for determining if all demands have been satisfied.
#define SE_IBF_HASH_NUM
The hash num parameter for the difference digests and strata estimators.
static void incoming_timeout_cb(void *cls)
Timeout happens iff:
#define SE_IBFS_TOTAL_SIZE
Primes for all 4 different strata estimators 61,67,71,73,79,83,89,97 348 Based on the bsc thesis of E...
#define DIFFERENTIAL_RTT_MEAN
AVG RTT for differential sync when k=2 and Factor = 2 Based on the bsc thesis of Elias Summermatter (...
static int op_get_element_iterator(void *cls, uint32_t key, void *value)
Iterator over the mapping from IBF keys to element entries.
static int prepare_ibf_iterator(void *cls, uint32_t key, void *value)
Insert a key into an ibf.
static int check_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check an element message from a remote peer.
static void run(void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_SERVICE_Handle *service)
Function called by the service's run method to run service-specific setup code.
static int check_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
uint8_t determine_strata_count(uint64_t avg_element_size, uint64_t element_count)
Calculates the optimal number of strata Estimators to send.
static unsigned int ibf_size
CADET service; establish channels to distant peers.
Constants for network protocols.
Two-peer set union operations.
API to create, modify and access statistics.
struct GNUNET_CADET_Handle * GNUNET_CADET_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the MQ-based cadet service.
Definition: cadet_api.c:897
void GNUNET_CADET_receive_done(struct GNUNET_CADET_Channel *channel)
Indicate readiness to receive the next message on a channel.
Definition: cadet_api.c:875
void GNUNET_CADET_channel_destroy(struct GNUNET_CADET_Channel *channel)
Destroy an existing channel.
Definition: cadet_api.c:833
struct GNUNET_MQ_Handle * GNUNET_CADET_get_mq(const struct GNUNET_CADET_Channel *channel)
Obtain the message queue for a connected channel.
Definition: cadet_api.c:1081
struct GNUNET_CADET_Port * GNUNET_CADET_open_port(struct GNUNET_CADET_Handle *h, const struct GNUNET_HashCode *port, GNUNET_CADET_ConnectEventHandler connects, void *connects_cls, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Open a port to receive incoming MQ-based channels.
Definition: cadet_api.c:966
void GNUNET_CADET_disconnect(struct GNUNET_CADET_Handle *handle)
Disconnect from the cadet service.
Definition: cadet_api.c:777
void GNUNET_CADET_close_port(struct GNUNET_CADET_Port *p)
Close a port opened with GNUNET_CADET_open_port.
Definition: cadet_api.c:804
struct GNUNET_CADET_Channel * GNUNET_CADET_channel_create(struct GNUNET_CADET_Handle *h, void *channel_cls, const struct GNUNET_PeerIdentity *destination, const struct GNUNET_HashCode *port, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Create a new channel towards a remote peer.
Definition: cadet_api.c:1030
struct GNUNET_CONFIGURATION_Handle * GNUNET_CONFIGURATION_create(const struct GNUNET_OS_ProjectData *pd)
Create a new configuration object.
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_get_value_number(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, unsigned long long *number)
Get a configuration value that should be a number.
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_load(struct GNUNET_CONFIGURATION_Handle *cfg, const char *filename)
Load configuration.
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_get_value_float(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, float *number)
Get a configuration value that should be a floating point number.
uint64_t GNUNET_CRYPTO_random_u64(enum GNUNET_CRYPTO_Quality mode, uint64_t max)
Generate a random unsigned 64-bit value.
uint32_t GNUNET_CRYPTO_random_u32(enum GNUNET_CRYPTO_Quality mode, uint32_t i)
Produce a random value.
@ GNUNET_CRYPTO_QUALITY_NONCE
Randomness for IVs etc.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
int GNUNET_CRYPTO_hash_cmp(const struct GNUNET_HashCode *h1, const struct GNUNET_HashCode *h2)
Compare function for HashCodes, producing a total ordering of all hashcodes.
Definition: crypto_hash.c:218
enum GNUNET_GenericReturnValue GNUNET_CRYPTO_kdf(void *result, size_t out_len, const void *xts, size_t xts_len, const void *skm, size_t skm_len,...)
Derive key.
Definition: crypto_kdf.c:62
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_contains(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Check if the map contains any value under the given key (including values that are NULL).
int GNUNET_CONTAINER_multihashmap32_get_multiple(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, GNUNET_CONTAINER_MultiHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map that match a particular key.
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MultiHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap32_put(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
struct GNUNET_CONTAINER_MultiHashMap32 * GNUNET_CONTAINER_multihashmap32_create(unsigned int len)
Create a 32-bit key multi hash map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_remove(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, const void *value)
Remove the given key-value pair from the map.
unsigned int GNUNET_CONTAINER_multihashmap32_size(const struct GNUNET_CONTAINER_MultiHashMap32 *map)
Get the number of key-value pairs in the map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
unsigned int GNUNET_CONTAINER_multihashmap_size(const struct GNUNET_CONTAINER_MultiHashMap *map)
Get the number of key-value pairs in the map.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
void GNUNET_CONTAINER_multihashmap32_destroy(struct GNUNET_CONTAINER_MultiHashMap32 *map)
Destroy a 32-bit key hash map.
int GNUNET_CONTAINER_multihashmap32_iterate(struct GNUNET_CONTAINER_MultiHashMap32 *map, GNUNET_CONTAINER_MultiHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST
, ' bother checking if a value already exists (faster than GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE...
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE
Allow multiple values with the same key.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY
There must only be one value per key; storing a value should fail if a value under the same key alrea...
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
If a value with the given key exists, replace it.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
#define GNUNET_log(kind,...)
void GNUNET_CRYPTO_hash_context_read(struct GNUNET_HashContext *hc, const void *buf, size_t size)
Add data to be hashed.
Definition: crypto_hash.c:363
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:54
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:37
void GNUNET_CRYPTO_hash_context_finish(struct GNUNET_HashContext *hc, struct GNUNET_HashCode *r_hash)
Finish the hash computation.
Definition: crypto_hash.c:387
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
GNUNET_GenericReturnValue
Named constants for return values.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
struct GNUNET_HashContext * GNUNET_CRYPTO_hash_context_start(void)
Start incremental hashing operation.
Definition: crypto_hash.c:347
@ GNUNET_OK
@ GNUNET_YES
@ GNUNET_NO
@ GNUNET_SYSERR
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur.
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_DEBUG
@ GNUNET_ERROR_TYPE_INFO
struct GNUNET_MessageHeader * GNUNET_copy_message(const struct GNUNET_MessageHeader *msg)
Create a copy of the given message.
int GNUNET_snprintf(char *buf, size_t size, const char *format,...) __attribute__((format(printf
Like snprintf, just aborts if the buffer is of insufficient size.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_free(ptr)
Wrapper around free.
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:305
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *mqm)
Discard the message queue message, free all allocated resources.
Definition: mq.c:285
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct.
Definition: gnunet_mq_lib.h:61
#define GNUNET_MQ_msg_nested_mh(mvar, type, mh)
Allocate a GNUNET_MQ_Envelope, and append a payload message after the given message struct.
#define GNUNET_MQ_msg_header(type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
Definition: gnunet_mq_lib.h:85
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:76
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
#define GNUNET_MQ_msg_header_extra(mh, esize, type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header and extra space.
Definition: gnunet_mq_lib.h:97
#define GNUNET_MQ_extract_nested_mh(var)
Return a pointer to the message at the end of the given message.
const struct GNUNET_OS_ProjectData * GNUNET_OS_project_data_gnunet(void)
Return default project data used by 'libgnunetutil' for GNUnet.
#define GNUNET_MESSAGE_TYPE_SETU_REQUEST
Notify the client of an incoming request from a remote peer.
#define GNUNET_MESSAGE_TYPE_SETU_REJECT
Reject a set request.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST
Request a set union operation from a remote peer.
#define GNUNET_MESSAGE_TYPE_SETU_RESULT
Handle result message from operation.
#define GNUNET_MESSAGE_TYPE_SETU_CREATE
Create a new local set.
#define GNUNET_MESSAGE_TYPE_SETU_ADD
Add element to set.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_DONE
Set operation is done.
#define GNUNET_MESSAGE_TYPE_SETU_ACCEPT
Accept an incoming set request.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OVER
Request all missing elements from the other peer, based on their sets and the elements we previously ...
#define GNUNET_MESSAGE_TYPE_SETU_CANCEL
Cancel a set operation.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_OFFER
Tell the other peer which hashes match a given IBF key.
#define GNUNET_MESSAGE_TYPE_SETU_LISTEN
Listen for operation requests.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_DONE
Request all missing elements from the other peer, based on their sets and the elements we previously ...
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEC
Compressed strata estimator.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_IBF
Invertible bloom filter.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_ELEMENTS
Actual set elements.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SEND_FULL
Signals other peer that all elements are sent.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_FULL_ELEMENT
Send a set element, not as response to a demand but because we're sending the full set.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_SE
Strata estimator.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_INQUIRY
Tell the other peer to send us a list of hashes that match an IBF key.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SETU_P2P_REQUEST_FULL
Demand the whole element from the other peer, given only the hash code.
#define GNUNET_MESSAGE_TYPE_SETU_EVALUATE
Evaluate a set operation.
void GNUNET_SCHEDULER_shutdown(void)
Request the shutdown of a scheduler.
Definition: scheduler.c:567
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run on shutdown, that is when a CTRL-C signal is received,...
Definition: scheduler.c:1339
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:980
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1277
void GNUNET_SERVICE_client_drop(struct GNUNET_SERVICE_Client *c)
Ask the server to disconnect from the given client.
Definition: service.c:2418
void GNUNET_SERVICE_client_continue(struct GNUNET_SERVICE_Client *c)
Continue receiving further messages from the given client.
Definition: service.c:2389
@ GNUNET_SERVICE_OPTION_NONE
Use defaults.
#define GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE
Maximum size of a context message for set operation requests.
void GNUNET_SETU_element_hash(const struct GNUNET_SETU_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition: setu_api.c:890
GNUNET_SETU_Status
Status for the result callback.
@ GNUNET_SETU_STATUS_DONE
Success, all elements have been sent (and received).
@ GNUNET_SETU_STATUS_ADD_REMOTE
Element should be added to the result set of the remote peer, i.e.
@ GNUNET_SETU_STATUS_FAILURE
The other peer refused to do the operation with us, or something went wrong.
@ GNUNET_SETU_STATUS_ADD_LOCAL
Element should be added to the result set of the local peer, i.e.
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
static unsigned int size
Size of the "table".
Definition: peer.c:68
#define _(String)
GNU gettext support macro.
Definition: platform.h:178
uint32_t number
uint8_t ibf_get_max_counter(struct InvertibleBloomFilter *ibf)
Returns the minimal bytes needed to store the counter of the IBF.
Definition: ibf.c:287
State we keep per client.
struct GNUNET_MQ_Handle * mq
MQ to talk to client.
struct Listener * listener
Listener, if associated with the client, otherwise NULL.
struct Set * set
Set, if associated with the client, otherwise NULL.
struct GNUNET_SERVICE_Client * client
Client this is about.
Information about an element element in the set.
int remote
GNUNET_YES if the element is a remote element, and does not belong to the operation's set.
struct GNUNET_SET_Element element
The actual element.
unsigned int generation
First generation that includes this element.
struct GNUNET_HashCode element_hash
Hash of the element.
struct GNUNET_ARM_Operation * next
This is a doubly-linked list.
Definition: arm_api.c:45
Opaque handle to a channel.
Definition: cadet.h:116
Opaque handle to the service.
Definition: cadet_api.c:39
Opaque handle to a port.
Definition: cadet_api.c:80
Internal representation of the hash map.
Internal representation of the hash map.
A 512-bit hashcode.
Handle to a message queue.
Definition: mq.c:87
Message handler for a specific message type.
Header for all communications.
The identity of the host (wraps the signing key of the peer).
Entry in list of pending tasks.
Definition: scheduler.c:136
Handle to a client that is connected to a service.
Definition: service.c:249
Handle to a service.
Definition: service.c:116
Message sent by a listening client to the service to accept performing the operation with the other p...
Definition: setu.h:79
Sent to the service by the client in order to cancel a set operation.
Definition: setu.h:350
Message sent by the client to the service to ask starting a new set to perform operations with.
Definition: setu.h:41
Message sent by client to the service to add an element to the set.
Definition: setu.h:326
uint16_t element_type
Type of the element to add or remove.
Definition: setu.h:335
uint16_t reserved
For alignment, always zero.
Definition: setu.h:340
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_SETU_ADD.
Definition: setu.h:330
Element stored in a set.
uint16_t element_type
Application-specific element type.
uint16_t size
Number of bytes in the buffer pointed to by data.
const void * data
Actual data of the element.
Message sent by client to service to initiate a set operation as a client (not as listener).
Definition: setu.h:203
Message sent by the client to the service to start listening for incoming requests to perform a certa...
Definition: setu.h:56
Message sent by a listening client to the service to reject performing the operation with the other p...
Definition: setu.h:158
A request for an operation with another client.
Definition: setu.h:175
struct GNUNET_PeerIdentity peer_id
Identity of the requesting peer.
Definition: setu.h:190
uint32_t accept_id
ID of the to identify the request when accepting or rejecting it.
Definition: setu.h:185
Message sent by the service to the client to indicate an element that is removed (set intersection) o...
Definition: setu.h:290
uint64_t current_size
Current set size.
Definition: setu.h:299
uint16_t result_status
Was the evaluation successful? Contains an enum GNUNET_SETU_Status in NBO.
Definition: setu.h:310
uint16_t element_type
Type of the element attached to the message, if any.
Definition: setu.h:315
uint32_t request_id
id the result belongs to
Definition: setu.h:304
uint16_t size
Number of bytes in the buffer pointed to by data.
const void * data
Actual data of the element.
uint16_t element_type
Application-specific element type.
Handle for the service.
Context for op_get_element_iterator.
struct GNUNET_HashCode hash
FIXME.
struct KeyEntry * k
FIXME.
Message containing buckets of an invertible bloom filter.
Hash of an IBF key.
Definition: ibf.h:55
Keys that can be inserted into and removed from an IBF.
Definition: ibf.h:46
uint64_t key_val
Definition: ibf.h:47
estimate_best_mode_of_operation (uint64_t avg_element_size, uint64_t local_set_size,...
Invertible bloom filter (IBF).
Definition: ibf.h:83
int remote_decoded_count
If an IBF is decoded this count stores how many elements are on the remote site.
Definition: ibf.h:108
int local_decoded_count
If an IBF is decoded this count stores how many elements are on the local site.
Definition: ibf.h:101
uint32_t size
How many cells does this IBF have?
Definition: ibf.h:87
The key entry is used to associate an ibf key with an element.
struct ElementEntry * element
The actual element associated with the key.
struct IBF_Key ibf_key
IBF key for the entry, derived from the current salt.
int received
Did we receive this element? Even if element->is_foreign is false, we might have received the element...
A listener is inhabited by a client, and waits for evaluation requests from remote peers.
struct Listener * next
Listeners are held in a doubly linked list.
struct ClientState * cs
Client that owns the listener.
struct GNUNET_HashCode app_id
Application ID for the operation, used to distinguish multiple operations of the same type with the s...
struct GNUNET_CADET_Port * open_port
The port we are listening on with CADET.
struct Listener * prev
Listeners are held in a doubly linked list.
struct Operation * op_tail
Tail of DLL of operations this listener is responsible for.
struct Operation * op_head
Head of DLL of operations this listener is responsible for.
struct StrataEstimator ** stratas
Array of strata estimators.
Operation context used to execute a set operation.
unsigned int generation_created
Generation in which the operation handle was created.
uint32_t suggest_id
Unique request id for the request from a remote peer, sent to the client, which will accept or reject...
uint32_t salt_receive
Salt for the IBF we've received and that we're currently decoding.
struct GNUNET_CONTAINER_MultiHashMap * message_control_flow
Hashmap to keep track of the send/received messages.
struct InvertibleBloomFilter * remote_ibf
The IBF we currently receive.
struct GNUNET_MessageHeader * context_msg
Context message, may be NULL.
int force_delta
Always use delta operation instead of sending full sets, even it it's less efficient.
int force_full
Always send full sets, even if delta operations would be more efficient.
uint8_t ibf_bucket_number_factor
Set difference is multiplied with this factor to gennerate large enough IBF.
uint64_t total_elements_size_local
Total size of local set.
uint64_t local_set_diff
Estimated or committed set difference at the start.
struct InvertibleBloomFilter * local_ibf
The IBF with the local set's element.
uint64_t ibf_buckets_received
Number of ibf buckets already received into the remote_ibf.
struct Operation * prev
Kept in a DLL of the listener, if listener is non-NULL.
struct GNUNET_CADET_Channel * channel
Channel to the peer.
uint8_t mode_of_operation
Mode of operation that was chosen by the algorithm.
uint32_t salt_send
Salt that we're using for sending IBFs.
uint8_t differential_sync_iterations
is the count of already passed differential sync iterations
uint64_t rtt_bandwidth_tradeoff
User defined Bandwidth Round Trips Tradeoff.
struct GNUNET_CONTAINER_MultiHashMap * demanded_hashes
Hashes for elements that we have demanded from the other peer.
uint8_t ibf_number_buckets_per_element
Number of Element per bucket in IBF.
struct GNUNET_MQ_Handle * mq
Message queue for the channel.
bool active_passive_switch_required
Boolean to enforce an active passive switch.
struct GNUNET_PeerIdentity peer
The identity of the requesting peer.
int client_done_sent
Did we send the client that we are done?
uint8_t peer_site
Defines which site a client is 0 = Initiating peer 1 = Receiving peer.
struct MultiStrataEstimator * se
Copy of the set's strata estimator at the time of creation of this operation.
enum UnionOperationPhase phase
Current state of the operation.
uint64_t local_element_count
Local peer element count.
int symmetric
GNUNET_YES to also send back set elements we are sending to the remote peer.
uint64_t byzantine_lower_bound
Lower bound for the set size, used only when byzantine mode is enabled.
struct Listener * listener
Port this operation runs on.
uint32_t received_fresh
Number of elements we received from the other peer that were not in the local set yet.
uint64_t initial_size
Initial size of our set, just before the operation started.
struct Operation * next
Kept in a DLL of the listener, if listener is non-NULL.
uint32_t salt
Salt to use for the operation.
uint64_t byzantine_upper_bound
Limit of number of elements in set.
uint32_t remote_element_count
Remote peers element count.
uint64_t remote_set_diff
Estimated or committed set difference at the start.
uint32_t client_request_id
ID used to identify an operation between service and client.
struct GNUNET_CONTAINER_MultiHashMap32 * key_to_element
Maps unsalted IBF-Keys to elements.
int byzantine
GNUNET_YES to fail operations where Byzantine faults are suspected
struct GNUNET_CONTAINER_MultiHashMap * inquiries_sent
Hashmap to keep track of the send/received inquiries (ibf keys)
struct GNUNET_SCHEDULER_Task * timeout_task
Timeout task, if the incoming peer has not been accepted after the timeout, it will be disconnected.
struct Set * set
Set associated with the operation, NULL until the spec has been associated with a set.
uint32_t received_total
Total number of elements received from the other peer.
Used as a closure for sending elements with a specific IBF key.
struct Operation * op
Operation for which the elements should be sent.
struct IBF_Key ibf_key
The IBF key whose matching elements should be sent.
SetContent stores the actual set elements, which may be shared by multiple generations derived from o...
int iterator_count
Number of concurrently active iterators.
unsigned int latest_generation
FIXME: document!
uint64_t elements_randomized_salt
Salt to construct the randomized element map.
struct GNUNET_CONTAINER_MultiHashMap * elements
Maps struct GNUNET_HashCode * to struct ElementEntry *.
struct GNUNET_CONTAINER_MultiHashMap * elements_randomized
Maps struct GNUNET_HashCode * to struct ElementEntry * randomized.
unsigned int refcount
Number of references to the content.
A set that supports a specific operation with other peers.
struct MultiStrataEstimator * se
The strata estimator is only generated once for each set.
struct Set * next
Sets are held in a doubly linked list (in sets_head and sets_tail).
struct Operation * ops_head
Evaluate operations are held in a linked list.
struct Operation * ops_tail
Evaluate operations are held in a linked list.
struct Set * prev
Sets are held in a doubly linked list.
struct SetContent * content
Content, possibly shared by multiple sets, and thus reference counted.
struct ClientState * cs
Client that owns the set.
unsigned int current_generation
Current generation, that is, number of previously executed operations and lazy copies on the underlyi...
Strata estimator together with the peer's overall set size.
uint64_t set_size
Size of the local set.
uint8_t se_count
The number of ses transmitted.
struct InvertibleBloomFilter ** strata
The IBFs of this strata estimator.
unsigned int strata_count
Size of the IBF array in strata.
Message which signals to other peer that we are sending full set.
uint32_t local_set_difference
Local set difference calculated with strata estimator.
uint32_t remote_set_difference
Remote set difference calculated with strata estimator.
uint32_t remote_set_size
Total remote set size.
Struct to tracked messages in message control flow.
enum MESSAGE_CONTROL_FLOW_STATE element
Track the message control state of the element message.
enum MESSAGE_CONTROL_FLOW_STATE offer
Track the message control state of the offer message.
enum MESSAGE_CONTROL_FLOW_STATE demand
Track the message control state of the demand message.