GNUnet 0.21.0
gnunet-service-setu.c
Go to the documentation of this file.
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
27#include "platform.h"
28#include "gnunet_util_lib.h"
30#include "ibf.h"
31#include "gnunet_protocols.h"
32#include "gnunet_applications.h"
37#include <gcrypt.h>
38#include "gnunet_setu_service.h"
39#include "setu.h"
40
41#define LOG(kind, ...) GNUNET_log_from (kind, "setu", __VA_ARGS__)
42
47#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
48
52#define SE_STRATA_COUNT 32
53
54
59#define SE_IBFS_TOTAL_SIZE 632
60
64#define SE_IBF_HASH_NUM 3
65
69#define MAX_BUCKETS_PER_MESSAGE ((1 << 16) / IBF_BUCKET_SIZE)
70
76#define MAX_IBF_SIZE 1048576
77
78
83#define IBF_MIN_SIZE 37
84
89#define DIFFERENTIAL_RTT_MEAN 3.65145
90
95#define SECURITY_LEVEL 80
96
102#define PROBABILITY_FOR_NEW_ROUND 0.15
103
108#define MEASURE_PERFORMANCE 0
109
110
115{
120
131
136
141
146
152
158
164
170
177
183{
188
193
199
200
207struct ElementEntry
208{
214
220
224 unsigned int generation;
225
230 int remote;
231};
232
233
238struct Listener;
239
240
244struct Set;
245
246
250struct ClientState
251{
255 struct Set *set;
256
260 struct Listener *listener;
261
266
270 struct GNUNET_MQ_Handle *mq;
271};
272
273
277struct Operation
278{
279
285
289 uint64_t initial_size;
290
294 struct Operation *next;
295
299 struct Operation *prev;
300
305
309 struct Listener *listener;
310
314 struct GNUNET_MQ_Handle *mq;
315
320
325 struct Set *set;
326
332
337
342
349
355
360
365
370
375
379 uint32_t salt_send;
380
384 uint32_t salt_receive;
385
391
396
400 uint32_t salt;
401
405 uint32_t remote_element_count;
406
410 uint32_t client_request_id;
411
416 int force_delta;
417
422 int force_full;
423
428 int byzantine;
429
435
441
447 uint32_t suggest_id;
448
453 unsigned int generation_created;
454
455
460
461
466
467
473
479 uint8_t peer_site;
480
481
486
491
496
501
502
507
512
517
522
527
532};
533
534
539struct SetContent
540{
545
550
555
559 unsigned int refcount;
560
564 unsigned int latest_generation;
565
569 int iterator_count;
570};
571
572
576struct Set
577{
581 struct Set *next;
582
586 struct Set *prev;
587
592 struct ClientState *cs;
593
598 struct SetContent *content;
599
605
609 struct Operation *ops_head;
610
614 struct Operation *ops_tail;
615
620 unsigned int current_generation;
621
622};
623
624
628struct KeyEntry
629{
633 struct IBF_Key ibf_key;
634
641 struct ElementEntry *element;
642
648 int received;
649};
650
651
657{
662 struct IBF_Key ibf_key;
663
668 struct Operation *op;
669};
670
671
676struct Listener
677{
681 struct Listener *next;
682
686 struct Listener *prev;
687
693 struct Operation *op_head;
694
700 struct Operation *op_tail;
701
706 struct ClientState *cs;
707
712
717 struct GNUNET_HashCode app_id;
718
719};
720
721
727
732
736static struct Listener *listener_head;
737
741static struct Listener *listener_tail;
742
746static unsigned int num_clients;
747
752static int in_shutdown;
753
759static uint32_t suggest_id;
760
761#if MEASURE_PERFORMANCE
766static const struct GNUNET_CONFIGURATION_Handle *setu_cfg;
767
768
774struct perf_num_send_received_msg
775{
776 uint64_t sent;
777 uint64_t sent_var_bytes;
778 uint64_t received;
779 uint64_t received_var_bytes;
780};
781
785struct per_store_struct
786{
787 struct perf_num_send_received_msg operation_request;
788 struct perf_num_send_received_msg se;
789 struct perf_num_send_received_msg request_full;
790 struct perf_num_send_received_msg element_full;
791 struct perf_num_send_received_msg full_done;
792 struct perf_num_send_received_msg ibf;
793 struct perf_num_send_received_msg inquery;
794 struct perf_num_send_received_msg element;
795 struct perf_num_send_received_msg demand;
796 struct perf_num_send_received_msg offer;
797 struct perf_num_send_received_msg done;
798 struct perf_num_send_received_msg over;
799 uint64_t se_diff;
800 uint64_t se_diff_remote;
801 uint64_t se_diff_local;
802 uint64_t active_passive_switches;
803 uint8_t mode_of_operation;
804};
805
806struct per_store_struct perf_store;
807#endif
808
814{
819
824
829
834};
835
841{
846
851
856};
857
858
863{
876};
877
878
879#if MEASURE_PERFORMANCE
880
886static void
887load_config (struct Operation *op)
888{
889 long long number;
890 float fl;
891
892 setu_cfg = GNUNET_CONFIGURATION_create ();
894 "perf_setu.conf");
896 "IBF",
897 "BUCKET_NUMBER_FACTOR",
898 &fl);
899 op->ibf_bucket_number_factor = fl;
901 "IBF",
902 "NUMBER_PER_BUCKET",
903 &number);
904 op->ibf_number_buckets_per_element = number;
906 "PERFORMANCE",
907 "TRADEOFF",
908 &number);
909 op->rtt_bandwidth_tradeoff = number;
911 "BOUNDARIES",
912 "UPPER_ELEMENT",
913 &number);
914 op->byzantine_upper_bound = number;
915 op->peer_site = 0;
916}
917
918
925static int
926sum_sent_received_bytes (uint64_t size,
927 struct perf_num_send_received_msg
928 perf_num_send_received_msg)
929{
930 return (size * perf_num_send_received_msg.sent)
931 + (size * perf_num_send_received_msg.received)
932 + perf_num_send_received_msg.sent_var_bytes
933 + perf_num_send_received_msg.received_var_bytes;
934}
935
936
940static void
941calculate_perf_store ()
942{
943
947 float rtt = 1;
948 int bytes_transmitted = 0;
949
953 if ((perf_store.element_full.received != 0) ||
954 (perf_store.element_full.sent != 0)
955 )
956 rtt += 1;
957
958 if ((perf_store.request_full.received != 0) ||
959 (perf_store.request_full.sent != 0)
960 )
961 rtt += 0.5;
962
967 if ((perf_store.element.received != 0) ||
968 (perf_store.element.sent != 0))
969 {
970 int iterations = perf_store.active_passive_switches;
971
972 if (iterations > 0)
973 rtt += iterations * 0.5;
974 rtt += 2.5;
975 }
976
977
981 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
983 perf_store.request_full);
984
985 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
987 perf_store.element_full);
988 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
990 perf_store.element);
991 // bytes_transmitted += sum_sent_received_bytes(sizeof(GNUNET_MESSAGE_TYPE_SETU_P2P_OPERATION_REQUEST), perf_store.operation_request);
992 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
994 perf_store.se);
995 bytes_transmitted += sum_sent_received_bytes (4, perf_store.full_done);
996 bytes_transmitted += sum_sent_received_bytes (sizeof(struct IBFMessage),
997 perf_store.ibf);
998 bytes_transmitted += sum_sent_received_bytes (sizeof(struct InquiryMessage),
999 perf_store.inquery);
1000 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1002 perf_store.demand);
1003 bytes_transmitted += sum_sent_received_bytes (sizeof(struct
1005 perf_store.offer);
1006 bytes_transmitted += sum_sent_received_bytes (4, perf_store.done);
1007
1011 float factor;
1012 GNUNET_CONFIGURATION_get_value_float (setu_cfg,"IBF", "BUCKET_NUMBER_FACTOR",
1013 &factor);
1014 long long num_per_bucket;
1015 GNUNET_CONFIGURATION_get_value_number (setu_cfg,"IBF", "NUMBER_PER_BUCKET",
1016 &num_per_bucket);
1017
1018
1019 int decoded = 0;
1020 if (perf_store.active_passive_switches == 0)
1021 decoded = 1;
1022 int ibf_bytes_transmitted = sum_sent_received_bytes (sizeof(struct
1023 IBFMessage),
1024 perf_store.ibf);
1025
1026 FILE *out1 = fopen ("perf_data.csv", "a");
1027 fprintf (out1, "%d,%f,%d,%d,%f,%d,%d,%d,%d,%d\n",num_per_bucket,factor,
1028 decoded,ibf_bytes_transmitted,rtt,perf_store.se_diff,
1029 bytes_transmitted,
1030 perf_store.se_diff_local,perf_store.se_diff_remote,
1031 perf_store.mode_of_operation);
1032 fclose (out1);
1033
1034}
1035
1036
1037#endif
1050static uint8_t
1051estimate_best_mode_of_operation (uint64_t avg_element_size,
1052 uint64_t local_set_size,
1053 uint64_t remote_set_size,
1054 uint64_t est_set_diff_remote,
1055 uint64_t est_set_diff_local,
1056 uint64_t bandwith_latency_tradeoff,
1057 uint64_t ibf_bucket_number_factor)
1058{
1059
1060 /*
1061 * In case of initial sync fall to predefined states
1062 */
1063
1064 if (0 == local_set_size)
1066 if (0 == remote_set_size)
1068
1069 /*
1070 * Calculate bytes for full Sync
1071 */
1072
1073 uint8_t sizeof_full_done_header = 4;
1074 uint8_t sizeof_done_header = 4;
1075 uint8_t rtt_min_full = 2;
1076 uint8_t sizeof_request_full = 4;
1077 uint64_t estimated_total_diff = (est_set_diff_remote + est_set_diff_local);
1078
1079 /* Estimate byte required if we send first */
1080 uint64_t total_elements_to_send_local_send_first = est_set_diff_remote
1081 + local_set_size;
1082
1083 uint64_t total_bytes_full_local_send_first = (avg_element_size
1084 *
1085 total_elements_to_send_local_send_first) \
1086 + (
1087 total_elements_to_send_local_send_first * sizeof(struct
1089 + (sizeof_full_done_header * 2) \
1090 + rtt_min_full
1091 * bandwith_latency_tradeoff;
1092
1093 /* Estimate bytes required if we request from remote peer */
1094 uint64_t total_elements_to_send_remote_send_first = est_set_diff_local
1095 + remote_set_size;
1096
1097 uint64_t total_bytes_full_remote_send_first = (avg_element_size
1098 *
1099 total_elements_to_send_remote_send_first) \
1100 + (
1101 total_elements_to_send_remote_send_first * sizeof(struct
1103 + (sizeof_full_done_header * 2) \
1104 + (rtt_min_full + 0.5)
1105 * bandwith_latency_tradeoff \
1106 + sizeof_request_full;
1107
1108 /*
1109 * Calculate bytes for differential Sync
1110 */
1111
1112 /* Estimate bytes required by IBF transmission*/
1113
1114 long double ibf_bucket_count = estimated_total_diff
1115 * ibf_bucket_number_factor;
1116
1117 if (ibf_bucket_count <= IBF_MIN_SIZE)
1118 {
1119 ibf_bucket_count = IBF_MIN_SIZE;
1120 }
1121 uint64_t ibf_message_count = ceil ( ((float) ibf_bucket_count)
1122 / ((float) MAX_BUCKETS_PER_MESSAGE));
1123
1124 uint64_t estimated_counter_size = ceil (
1125 MIN (2 * log2l (((float) local_set_size)
1126 / ((float) ibf_bucket_count)),
1127 log2l (local_set_size)));
1128
1129 long double counter_bytes = (float) estimated_counter_size / 8;
1130
1131 uint64_t ibf_bytes = ceil ((sizeof (struct IBFMessage) * ibf_message_count)
1132 * 1.2 \
1133 + (ibf_bucket_count * sizeof(struct IBF_Key)) * 1.2 \
1134 + (ibf_bucket_count * sizeof(struct IBF_KeyHash))
1135 * 1.2 \
1136 + (ibf_bucket_count * counter_bytes) * 1.2);
1137
1138 /* Estimate full byte count for differential sync */
1139 uint64_t element_size = (avg_element_size
1140 + sizeof (struct GNUNET_SETU_ElementMessage)) \
1141 * estimated_total_diff;
1142 uint64_t done_size = sizeof_done_header;
1143 uint64_t inquery_size = (sizeof (struct IBF_Key)
1144 + sizeof (struct InquiryMessage))
1145 * estimated_total_diff;
1146 uint64_t demand_size =
1147 (sizeof(struct GNUNET_HashCode) + sizeof(struct GNUNET_MessageHeader))
1148 * estimated_total_diff;
1149 uint64_t offer_size = (sizeof (struct GNUNET_HashCode)
1150 + sizeof (struct GNUNET_MessageHeader))
1151 * estimated_total_diff;
1152
1153 uint64_t total_bytes_diff = (element_size + done_size + inquery_size
1154 + demand_size + offer_size + ibf_bytes) \
1156 * bandwith_latency_tradeoff);
1157
1158 uint64_t full_min = MIN (total_bytes_full_local_send_first,
1159 total_bytes_full_remote_send_first);
1160
1161 /* Decide between full and differential sync */
1162
1163 if (full_min < total_bytes_diff)
1164 {
1165 /* Decide between sending all element first or receiving all elements */
1166 if (total_bytes_full_remote_send_first > total_bytes_full_local_send_first)
1167 {
1169 }
1170 else
1171 {
1173 }
1174 }
1175 else
1176 {
1177 return DIFFERENTIAL_SYNC;
1178 }
1179}
1180
1181
1190static enum GNUNET_GenericReturnValue
1191check_valid_phase (const uint8_t allowed_phases[],
1192 size_t size_phases,
1193 struct Operation *op)
1194{
1198 for (uint32_t phase_ctr = 0; phase_ctr < size_phases; phase_ctr++)
1199 {
1200 uint8_t phase = allowed_phases[phase_ctr];
1201 if (phase == op->phase)
1202 {
1204 "Message received in valid phase\n");
1205 return GNUNET_YES;
1206 }
1207 }
1209 "Received message in invalid phase: %u\n", op->phase);
1210 return GNUNET_NO;
1211}
1212
1213
1225static int
1227 enum MESSAGE_CONTROL_FLOW_STATE new_mcfs,
1228 const struct GNUNET_HashCode *hash_code,
1229 enum MESSAGE_TYPE mt)
1230{
1231 struct messageControlFlowElement *cfe = NULL;
1232 enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1233
1238 cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1239 if ((ELEMENT_MESSAGE == mt) && (cfe != NULL))
1240 {
1241 if ((new_mcfs != MSG_CFS_SENT) && (MSG_CFS_RECEIVED != cfe->offer))
1242 {
1244 "Received an element without sent offer!\n");
1245 return GNUNET_NO;
1246 }
1247 /* Check that only requested elements are received! */
1248 if ((ELEMENT_MESSAGE == mt) && (new_mcfs != MSG_CFS_SENT) && (cfe->demand !=
1249 MSG_CFS_SENT))
1250 {
1252 "Received an element that was not demanded\n");
1253 return GNUNET_NO;
1254 }
1255 }
1256
1261 if (NULL == cfe)
1262 {
1264 if (GNUNET_SYSERR == GNUNET_CONTAINER_multihashmap_put (hash_map, hash_code,
1265 cfe,
1267 {
1268 GNUNET_free (cfe);
1269 return GNUNET_SYSERR;
1270 }
1271 }
1272
1277 if (OFFER_MESSAGE == mt)
1278 {
1279 mcfs = &cfe->offer;
1280 }
1281 else if (DEMAND_MESSAGE == mt)
1282 {
1283 mcfs = &cfe->demand;
1284 }
1285 else if (ELEMENT_MESSAGE == mt)
1286 {
1287 mcfs = &cfe->element;
1288 }
1289 else
1290 {
1291 return GNUNET_SYSERR;
1292 }
1293
1298 if (new_mcfs <= *mcfs)
1299 {
1300 return GNUNET_NO;
1301 }
1302
1303 *mcfs = new_mcfs;
1304 return GNUNET_YES;
1305}
1306
1307
1315static int
1318 struct GNUNET_HashCode *hash_code,
1319 enum MESSAGE_TYPE mt)
1320{
1321 struct messageControlFlowElement *cfe = NULL;
1322 enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
1323
1324 cfe = GNUNET_CONTAINER_multihashmap_get (hash_map, hash_code);
1325
1330 if (cfe != NULL)
1331 {
1332 if (OFFER_MESSAGE == mt)
1333 {
1334 mcfs = &cfe->offer;
1335 }
1336 else if (DEMAND_MESSAGE == mt)
1337 {
1338 mcfs = &cfe->demand;
1339 }
1340 else if (ELEMENT_MESSAGE == mt)
1341 {
1342 mcfs = &cfe->element;
1343 }
1344 else
1345 {
1346 return GNUNET_SYSERR;
1347 }
1348
1352 if (*mcfs != MSG_CFS_UNINITIALIZED)
1353 {
1354 return GNUNET_YES;
1355 }
1356 }
1357 return GNUNET_NO;
1358}
1359
1360
1371static int
1373 const struct GNUNET_HashCode *key,
1374 void *value)
1375{
1376 struct messageControlFlowElement *mcfe = value;
1377
1378 if (((mcfe->element == MSG_CFS_SENT) || (mcfe->element == MSG_CFS_RECEIVED) ))
1379 {
1380 return GNUNET_YES;
1381 }
1382 return GNUNET_NO;
1383}
1384
1385
1395static int
1397 const struct GNUNET_HashCode *key,
1398 void *value)
1399{
1400 struct Operation *op = cls;
1401 struct GNUNET_SETU_Element *element = value;
1402 op->total_elements_size_local += element->size;
1403 return GNUNET_YES;
1404}
1405
1406
1416static int
1418 const struct GNUNET_HashCode *key,
1419 void *value)
1420{
1421 struct Operation *op = cls;
1422
1423 struct GNUNET_HashContext *hashed_key_context =
1425 struct GNUNET_HashCode new_key;
1426
1430 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1431 &key,
1432 sizeof(struct IBF_Key));
1433 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
1434 &op->set->content->elements_randomized_salt,
1435 sizeof(uint32_t));
1436 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
1437 &new_key);
1438 GNUNET_CONTAINER_multihashmap_put (op->set->content->elements_randomized,
1439 &new_key,value,
1441 return GNUNET_YES;
1442}
1443
1444
1455static int
1457 uint32_t key,
1458 void *value)
1459{
1460 struct KeyEntry *k = value;
1461
1462 GNUNET_assert (NULL != k);
1463 if (GNUNET_YES == k->element->remote)
1464 {
1465 GNUNET_free (k->element);
1466 k->element = NULL;
1467 }
1468 GNUNET_free (k);
1469 return GNUNET_YES;
1470}
1471
1472
1479static void
1481{
1482 struct Operation *op = cls;
1483 struct GNUNET_MQ_Envelope *ev;
1484 struct GNUNET_SETU_ResultMessage *rm;
1485
1486 if (GNUNET_YES == op->client_done_sent)
1487 return;
1488 if (PHASE_FINISHED != op->phase)
1489 {
1491 "Union operation failed\n");
1493 "# Union operations failed",
1494 1,
1495 GNUNET_NO);
1498 rm->request_id = htonl (op->client_request_id);
1499 rm->element_type = htons (0);
1500 GNUNET_MQ_send (op->set->cs->mq,
1501 ev);
1502 return;
1503 }
1504
1505 op->client_done_sent = GNUNET_YES;
1506
1508 "# Union operations succeeded",
1509 1,
1510 GNUNET_NO);
1512 "Signalling client that union operation is done\n");
1513 ev = GNUNET_MQ_msg (rm,
1515 rm->request_id = htonl (op->client_request_id);
1517 rm->element_type = htons (0);
1519 op->key_to_element));
1520 GNUNET_MQ_send (op->set->cs->mq,
1521 ev);
1522}
1523
1524
1531static int
1533{
1534 if (op->byzantine != GNUNET_YES)
1535 return GNUNET_OK;
1536
1540 if (op->remote_element_count + op->remote_set_diff >
1541 op->byzantine_upper_bound)
1542 return GNUNET_SYSERR;
1543 if (op->local_element_count + op->local_set_diff > op->byzantine_upper_bound)
1544 return GNUNET_SYSERR;
1545
1549 if (op->remote_element_count < op->byzantine_lower_bound)
1550 return GNUNET_SYSERR;
1551 return GNUNET_OK;
1552}
1553
1554
1555static enum GNUNET_GenericReturnValue
1557 const struct GNUNET_HashCode *key,
1558 void *value)
1559{
1561 return GNUNET_YES;
1562}
1563
1564
1565/* FIXME: the destroy logic is a mess and should be cleaned up! */
1566
1579static void
1581{
1582 struct Set *set = op->set;
1583 struct GNUNET_CADET_Channel *channel;
1584
1586 "Destroying union operation %p\n",
1587 op);
1588 GNUNET_assert (NULL == op->listener);
1589 /* check if the op was canceled twice */
1590 if (NULL != op->remote_ibf)
1591 {
1592 ibf_destroy (op->remote_ibf);
1593 op->remote_ibf = NULL;
1594 }
1595 if (NULL != op->demanded_hashes)
1596 {
1597 GNUNET_CONTAINER_multihashmap_destroy (op->demanded_hashes);
1598 op->demanded_hashes = NULL;
1599 }
1600 if (NULL != op->local_ibf)
1601 {
1602 ibf_destroy (op->local_ibf);
1603 op->local_ibf = NULL;
1604 }
1605 if (NULL != op->se)
1606 {
1608 op->se = NULL;
1609 }
1610 if (NULL != op->key_to_element)
1611 {
1614 NULL);
1616 op->key_to_element = NULL;
1617 }
1618 if (NULL != op->message_control_flow)
1619 {
1620 GNUNET_CONTAINER_multihashmap_iterate (op->message_control_flow,
1622 NULL);
1623 GNUNET_CONTAINER_multihashmap_destroy (op->message_control_flow);
1624 op->message_control_flow = NULL;
1625 }
1626 if (NULL != op->inquiries_sent)
1627 {
1628 GNUNET_CONTAINER_multihashmap_destroy (op->inquiries_sent);
1629 op->inquiries_sent = NULL;
1630 }
1631 if (NULL != set)
1632 {
1634 set->ops_tail,
1635 op);
1636 op->set = NULL;
1637 }
1638 if (NULL != op->context_msg)
1639 {
1640 GNUNET_free (op->context_msg);
1641 op->context_msg = NULL;
1642 }
1643 if (NULL != (channel = op->channel))
1644 {
1645 /* This will free op; called conditionally as this helper function
1646 is also called from within the channel disconnect handler. */
1647 op->channel = NULL;
1649 }
1650 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
1651 * there was a channel end handler that will free 'op' on the call stack. */
1652}
1653
1654
1660static void
1662
1663
1669static void
1671{
1672 struct Listener *listener;
1673
1675 "Destroying incoming operation %p\n",
1676 op);
1677 if (NULL != (listener = op->listener))
1678 {
1680 listener->op_tail,
1681 op);
1682 op->listener = NULL;
1683 }
1684 if (NULL != op->timeout_task)
1685 {
1686 GNUNET_SCHEDULER_cancel (op->timeout_task);
1687 op->timeout_task = NULL;
1688 }
1690}
1691
1692
1698static void
1700{
1701 struct GNUNET_CADET_Channel *channel;
1702
1703 if (NULL != (channel = op->channel))
1704 {
1705 /* This will free op; called conditionally as this helper function
1706 is also called from within the channel disconnect handler. */
1707 op->channel = NULL;
1709 }
1710 if (NULL != op->listener)
1711 {
1713 return;
1714 }
1715 if (NULL != op->set)
1718 GNUNET_free (op);
1719}
1720
1721
1728static void
1730{
1731 struct GNUNET_MQ_Envelope *ev;
1733
1735 "union operation failed\n");
1737 msg->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
1738 msg->request_id = htonl (op->client_request_id);
1739 msg->element_type = htons (0);
1740 GNUNET_MQ_send (op->set->cs->mq,
1741 ev);
1743}
1744
1745
1756static void
1758{
1759 if (GNUNET_YES != op->byzantine)
1760 return;
1761
1762 int security_level_lb = -1 * SECURITY_LEVEL;
1763 uint64_t duplicates = op->received_fresh - op->received_total;
1764
1765 /*
1766 * Protect full sync from receiving double element when in FULL SENDING
1767 */
1768 if (PHASE_FULL_SENDING == op->phase)
1769 {
1770 if (duplicates > 0)
1771 {
1773 "PROTOCOL VIOLATION: Received duplicate element in full receiving "
1774 "mode of operation this is not allowed! Duplicates: %llu\n",
1775 (unsigned long long) duplicates);
1776 GNUNET_break_op (0);
1778 return;
1779 }
1780
1781 }
1782
1783 /*
1784 * Protect full sync with probabilistic algorithm
1785 */
1786 if (PHASE_FULL_RECEIVING == op->phase)
1787 {
1788 if (0 == op->remote_set_diff)
1789 op->remote_set_diff = 1;
1790
1791 long double base = (1 - (long double) (op->remote_set_diff
1792 / (long double) (op->initial_size
1793 + op->
1794 remote_set_diff)));
1795 long double exponent = (op->received_total - (op->received_fresh * ((long
1796 double)
1797 op->
1798 initial_size
1799 / (long
1800 double)
1801 op->
1802 remote_set_diff)));
1803 long double value = exponent * (log2l (base) / log2l (2));
1804 if ((value < security_level_lb) || (value > SECURITY_LEVEL) )
1805 {
1807 "PROTOCOL VIOLATION: Other peer violated probabilistic rule for receiving "
1808 "to many duplicated full element : %LF\n",
1809 value);
1810 GNUNET_break_op (0);
1812 return;
1813 }
1814 }
1815}
1816
1817
1822static void
1824{
1825 double probability = op->differential_sync_iterations * (log2l (
1827 / log2l (2));
1828 if ((-1 * SECURITY_LEVEL) > probability)
1829 {
1831 "PROTOCOL VIOLATION: Other peer violated probabilistic rule for to many active passive "
1832 "switches in differential sync: %u\n",
1833 op->differential_sync_iterations);
1834 GNUNET_break_op (0);
1836 return;
1837 }
1838}
1839
1840
1848static struct IBF_Key
1849get_ibf_key (const struct GNUNET_HashCode *src)
1850{
1851 struct IBF_Key key;
1852 uint16_t salt = 0;
1853
1855 GNUNET_CRYPTO_kdf (&key, sizeof(key),
1856 src, sizeof *src,
1857 &salt, sizeof(salt),
1858 NULL, 0));
1859 return key;
1860}
1861
1862
1866struct GetElementContext
1867{
1871 struct GNUNET_HashCode hash;
1872
1876 struct KeyEntry *k;
1877};
1878
1879
1890static int
1892 uint32_t key,
1893 void *value)
1894{
1895 struct GetElementContext *ctx = cls;
1896 struct KeyEntry *k = value;
1897
1898 GNUNET_assert (NULL != k);
1900 &ctx->hash))
1901 {
1902 ctx->k = k;
1903 return GNUNET_NO;
1904 }
1905 return GNUNET_YES;
1906}
1907
1908
1917static struct KeyEntry *
1919 const struct GNUNET_HashCode *element_hash)
1920{
1921 int ret;
1922 struct IBF_Key ibf_key;
1923 struct GetElementContext ctx = { { { 0 } }, 0 };
1924
1925 ctx.hash = *element_hash;
1926
1927 ibf_key = get_ibf_key (element_hash);
1929 (uint32_t) ibf_key.key_val,
1931 &ctx);
1932
1933 /* was the iteration aborted because we found the element? */
1934 if (GNUNET_SYSERR == ret)
1935 {
1936 GNUNET_assert (NULL != ctx.k);
1937 return ctx.k;
1938 }
1939 return NULL;
1940}
1941
1942
1957static void
1959 struct ElementEntry *ee,
1960 int received)
1961{
1962 struct IBF_Key ibf_key;
1963 struct KeyEntry *k;
1964
1966 k = GNUNET_new (struct KeyEntry);
1967 k->element = ee;
1968 k->ibf_key = ibf_key;
1969 k->received = received;
1971 GNUNET_CONTAINER_multihashmap32_put (op->key_to_element,
1972 (uint32_t) ibf_key.key_val,
1973 k,
1975}
1976
1977
1982static void
1983salt_key (const struct IBF_Key *k_in,
1984 uint32_t salt,
1985 struct IBF_Key *k_out)
1986{
1987 int s = (salt * 7) % 64;
1988 uint64_t x = k_in->key_val;
1989
1990 /* rotate ibf key */
1991 x = (x >> s) | (x << (64 - s));
1992 k_out->key_val = x;
1993}
1994
1995
1999static void
2000unsalt_key (const struct IBF_Key *k_in,
2001 uint32_t salt,
2002 struct IBF_Key *k_out)
2003{
2004 int s = (salt * 7) % 64;
2005 uint64_t x = k_in->key_val;
2006
2007 x = (x << s) | (x >> (64 - s));
2008 k_out->key_val = x;
2009}
2010
2011
2019static int
2021 uint32_t key,
2022 void *value)
2023{
2024 struct Operation *op = cls;
2025 struct KeyEntry *ke = value;
2026 struct IBF_Key salted_key;
2027
2029 "[OP %p] inserting %lx (hash %s) into ibf\n",
2030 op,
2031 (unsigned long) ke->ibf_key.key_val,
2033 salt_key (&ke->ibf_key,
2034 op->salt_send,
2035 &salted_key);
2036 ibf_insert (op->local_ibf, salted_key);
2037 return GNUNET_YES;
2038}
2039
2040
2048static int
2050 struct Operation *op)
2051{
2052 return ee->generation >= op->generation_created;
2053}
2054
2055
2066static int
2068 const struct GNUNET_HashCode *key,
2069 void *value)
2070{
2071 struct Operation *op = cls;
2072 struct ElementEntry *ee = value;
2073
2074 /* make sure that the element belongs to the set at the time
2075 * of creating the operation */
2076 if (GNUNET_NO ==
2078 op))
2079 return GNUNET_YES;
2082 ee,
2083 GNUNET_NO);
2084 return GNUNET_YES;
2085}
2086
2087
2093static void
2095{
2096 unsigned int len;
2097
2098 GNUNET_assert (NULL == op->key_to_element);
2099 len = GNUNET_CONTAINER_multihashmap_size (op->set->content->elements);
2100 op->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
2101 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2103 op);
2104}
2105
2106
2115static int
2117 uint32_t size)
2118{
2119 GNUNET_assert (NULL != op->key_to_element);
2120
2121 if (NULL != op->local_ibf)
2122 ibf_destroy (op->local_ibf);
2123 // op->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
2124 op->local_ibf = ibf_create (size,
2125 ((uint8_t) op->ibf_number_buckets_per_element));
2126 if (NULL == op->local_ibf)
2127 {
2129 "Failed to allocate local IBF\n");
2130 return GNUNET_SYSERR;
2131 }
2134 op);
2135 return GNUNET_OK;
2136}
2137
2138
2148static int
2150 uint32_t ibf_size)
2151{
2152 uint64_t buckets_sent = 0;
2153 struct InvertibleBloomFilter *ibf;
2154 op->differential_sync_iterations++;
2155
2159 uint32_t ibf_min_size = IBF_MIN_SIZE;
2160
2161 if (ibf_size < ibf_min_size)
2162 {
2163 ibf_size = ibf_min_size;
2164 }
2165 if (GNUNET_OK !=
2167 {
2168 /* allocation failed */
2169 return GNUNET_SYSERR;
2170 }
2171
2173 "sending ibf of size %u\n",
2174 (unsigned int) ibf_size);
2175
2176 {
2177 char name[64];
2178
2180 sizeof(name),
2181 "# sent IBF (order %u)",
2182 ibf_size);
2184 }
2185
2186 ibf = op->local_ibf;
2187
2188 while (buckets_sent < ibf_size)
2189 {
2190 unsigned int buckets_in_message;
2191 struct GNUNET_MQ_Envelope *ev;
2192 struct IBFMessage *msg;
2193
2194 buckets_in_message = ibf_size - buckets_sent;
2195 /* limit to maximum */
2196 if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
2197 buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
2198
2199#if MEASURE_PERFORMANCE
2200 perf_store.ibf.sent += 1;
2201 perf_store.ibf.sent_var_bytes += (buckets_in_message * IBF_BUCKET_SIZE);
2202#endif
2204 buckets_in_message * IBF_BUCKET_SIZE,
2206 msg->ibf_size = ibf_size;
2207 msg->offset = htonl (buckets_sent);
2208 msg->salt = htonl (op->salt_send);
2209 msg->ibf_counter_bit_length = ibf_get_max_counter (ibf);
2210
2211
2212 ibf_write_slice (ibf, buckets_sent,
2213 buckets_in_message, &msg[1], msg->ibf_counter_bit_length);
2214 buckets_sent += buckets_in_message;
2216 "ibf chunk size %u, %llu/%u sent\n",
2217 (unsigned int) buckets_in_message,
2218 (unsigned long long) buckets_sent,
2219 (unsigned int) ibf_size);
2220 GNUNET_MQ_send (op->mq, ev);
2221 }
2222
2223 /* The other peer must decode the IBF, so
2224 * we're passive. */
2225 op->phase = PHASE_PASSIVE_DECODING;
2226 return GNUNET_OK;
2227}
2228
2229
2237static unsigned int
2238get_size_from_difference (unsigned int diff, int number_buckets_per_element,
2239 float ibf_bucket_number_factor)
2240{
2243 return (((int) (diff * ibf_bucket_number_factor)) | 1);
2244
2245}
2246
2247
2248static unsigned int
2249get_next_ibf_size (float ibf_bucket_number_factor, unsigned int
2250 decoded_elements, unsigned int last_ibf_size)
2251{
2252 unsigned int next_size = (unsigned int) ((last_ibf_size * 2)
2253 - (ibf_bucket_number_factor
2254 * decoded_elements));
2257 return next_size | 1;
2258}
2259
2260
2270static int
2272 const struct GNUNET_HashCode *key,
2273 void *value)
2274{
2275 struct Operation *op = cls;
2276 struct GNUNET_SETU_ElementMessage *emsg;
2277 struct ElementEntry *ee = value;
2278 struct GNUNET_SETU_Element *el = &ee->element;
2279 struct GNUNET_MQ_Envelope *ev;
2280
2282 "Sending element %s\n",
2283 GNUNET_h2s (key));
2284#if MEASURE_PERFORMANCE
2285 perf_store.element_full.received += 1;
2286 perf_store.element_full.received_var_bytes += el->size;
2287#endif
2288 ev = GNUNET_MQ_msg_extra (emsg,
2289 el->size,
2291 emsg->element_type = htons (el->element_type);
2292 GNUNET_memcpy (&emsg[1],
2293 el->data,
2294 el->size);
2295 GNUNET_MQ_send (op->mq,
2296 ev);
2297 return GNUNET_YES;
2298}
2299
2300
2306static void
2308{
2309 struct GNUNET_MQ_Envelope *ev;
2310
2311 op->phase = PHASE_FULL_SENDING;
2313 "Dedicing to transmit the full set\n");
2314 /* FIXME: use a more memory-friendly way of doing this with an
2315 iterator, just as we do in the non-full case! */
2316
2317 // Randomize Elements to send
2318 op->set->content->elements_randomized = GNUNET_CONTAINER_multihashmap_create (
2319 32,GNUNET_NO);
2320 op->set->content->elements_randomized_salt = GNUNET_CRYPTO_random_u64 (
2322 UINT64_MAX);
2323 (void) GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2324 &
2326 op);
2327
2329 op->set->content->elements_randomized,
2331 op);
2332#if MEASURE_PERFORMANCE
2333 perf_store.full_done.sent += 1;
2334#endif
2336 GNUNET_MQ_send (op->mq,
2337 ev);
2338}
2339
2340
2347static int
2349 const struct StrataEstimatorMessage *msg)
2350{
2351 struct Operation *op = cls;
2352 int is_compressed;
2353 size_t len;
2354
2355 if (op->phase != PHASE_EXPECT_SE)
2356 {
2357 GNUNET_break (0);
2358 return GNUNET_SYSERR;
2359 }
2360 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
2361 msg->header.type));
2362 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
2363 if ((GNUNET_NO == is_compressed) &&
2365 {
2366 GNUNET_break (0);
2367 return GNUNET_SYSERR;
2368 }
2369 return GNUNET_OK;
2370}
2371
2372
2379static void
2381 const struct StrataEstimatorMessage *msg)
2382{
2383#if MEASURE_PERFORMANCE
2384 perf_store.se.received += 1;
2385 perf_store.se.received_var_bytes += ntohs (msg->header.size) - sizeof(struct
2387#endif
2388 struct Operation *op = cls;
2389 struct MultiStrataEstimator *remote_se;
2390 unsigned int diff;
2391 uint64_t other_size;
2392 size_t len;
2393 int is_compressed;
2394 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
2395 op->set->content->elements);
2396 // Setting peer site to receiving peer
2397 op->peer_site = 1;
2398
2402 uint8_t allowed_phases[] = {PHASE_EXPECT_SE};
2403 if (GNUNET_OK !=
2404 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2405 {
2406 GNUNET_break (0);
2408 return;
2409 }
2410
2412 if ((msg->se_count > 8) || (__builtin_popcount ((int) msg->se_count) != 1))
2413 {
2415 "PROTOCOL VIOLATION: Invalid number of se transmitted by other peer %u\n",
2416 msg->se_count);
2417 GNUNET_break_op (0);
2419 return;
2420 }
2421
2422 is_compressed = (GNUNET_MESSAGE_TYPE_SETU_P2P_SEC == htons (
2423 msg->header.type));
2425 "# bytes of SE received",
2426 ntohs (msg->header.size),
2427 GNUNET_NO);
2428 len = ntohs (msg->header.size) - sizeof(struct StrataEstimatorMessage);
2429 other_size = GNUNET_ntohll (msg->set_size);
2430 op->remote_element_count = other_size;
2431
2432 if (op->byzantine_upper_bound < op->remote_element_count)
2433 {
2435 "Exceeded configured upper bound <%"PRIu64"> of element: %u\n",
2436 op->byzantine_upper_bound,
2437 op->remote_element_count);
2439 return;
2440 }
2441
2445 if (NULL == remote_se)
2446 {
2447 /* insufficient resources, fail */
2449 return;
2450 }
2451 if (GNUNET_OK !=
2453 len,
2454 is_compressed,
2455 msg->se_count,
2457 remote_se))
2458 {
2459 /* decompression failed */
2460 strata_estimator_destroy (remote_se);
2462 return;
2463 }
2464 GNUNET_assert (NULL != op->se);
2465 strata_estimator_difference (remote_se,
2466 op->se);
2467
2468 /* Calculate remote local diff */
2469 long diff_remote = remote_se->stratas[0]->strata[0]->remote_decoded_count;
2470 long diff_local = remote_se->stratas[0]->strata[0]->local_decoded_count;
2471
2472 /* Prevent estimations from overshooting max element */
2473 if (diff_remote + op->remote_element_count > op->byzantine_upper_bound)
2474 diff_remote = op->byzantine_upper_bound - op->remote_element_count;
2475 if (diff_local + op->local_element_count > op->byzantine_upper_bound)
2476 diff_local = op->byzantine_upper_bound - op->local_element_count;
2477 if ((diff_remote < 0) || (diff_local < 0))
2478 {
2479 strata_estimator_destroy (remote_se);
2481 "PROTOCOL VIOLATION: More element is set as upper boundary or other peer is "
2482 "malicious: remote diff %ld, local diff: %ld\n",
2483 diff_remote, diff_local);
2484 GNUNET_break_op (0);
2486 return;
2487 }
2488
2489 /* Make estimation more precise in initial sync cases */
2490 if (0 == op->remote_element_count)
2491 {
2492 diff_remote = 0;
2493 diff_local = op->local_element_count;
2494 }
2495 if (0 == op->local_element_count)
2496 {
2497 diff_local = 0;
2498 diff_remote = op->remote_element_count;
2499 }
2500
2501 diff = diff_remote + diff_local;
2502 op->remote_set_diff = diff_remote;
2503
2505 uint64_t avg_element_size = 0;
2506 if (0 < op->local_element_count)
2507 {
2508 op->total_elements_size_local = 0;
2509 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
2510 &
2512 op);
2513 avg_element_size = op->total_elements_size_local / op->local_element_count;
2514 }
2515
2516 op->mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
2518 op->set->content->
2519 elements),
2520 op->
2521 remote_element_count,
2522 diff_remote,
2523 diff_local,
2524 op->
2525 rtt_bandwidth_tradeoff,
2526 op->
2527 ibf_bucket_number_factor);
2528
2529#if MEASURE_PERFORMANCE
2530 perf_store.se_diff_local = diff_local;
2531 perf_store.se_diff_remote = diff_remote;
2532 perf_store.se_diff = diff;
2533 perf_store.mode_of_operation = op->mode_of_operation;
2534#endif
2535
2536 strata_estimator_destroy (remote_se);
2538 op->se = NULL;
2540 "got se diff=%d, using ibf size %d\n",
2541 diff,
2542 1U << get_size_from_difference (diff, op->ibf_number_buckets_per_element,
2543 op->ibf_bucket_number_factor));
2544
2545 {
2546 char *set_debug;
2547
2548 set_debug = getenv ("GNUNET_SETU_BENCHMARK");
2549 if ((NULL != set_debug) &&
2550 (0 == strcmp (set_debug, "1")))
2551 {
2552 FILE *f = fopen ("set.log", "a");
2553 fprintf (f, "%llu\n", (unsigned long long) diff);
2554 fclose (f);
2555 }
2556 }
2557
2558 if ((GNUNET_YES == op->byzantine) &&
2559 (other_size < op->byzantine_lower_bound))
2560 {
2561 GNUNET_break (0);
2563 return;
2564 }
2565
2566 if ((GNUNET_YES == op->force_full) ||
2567 (op->mode_of_operation != DIFFERENTIAL_SYNC))
2568 {
2570 "Deciding to go for full set transmission (diff=%d, own set=%llu)\n",
2571 diff,
2572 (unsigned long long) op->initial_size);
2574 "# of full sends",
2575 1,
2576 GNUNET_NO);
2577 if (FULL_SYNC_LOCAL_SENDING_FIRST == op->mode_of_operation)
2578 {
2579 struct TransmitFullMessage *signal_msg;
2580 struct GNUNET_MQ_Envelope *ev;
2581 ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2583 signal_msg->remote_set_difference = htonl (diff_local);
2584 signal_msg->remote_set_size = htonl (op->local_element_count);
2585 signal_msg->local_set_difference = htonl (diff_remote);
2586 GNUNET_MQ_send (op->mq,
2587 ev);
2588 send_full_set (op);
2589 }
2590 else
2591 {
2592 struct GNUNET_MQ_Envelope *ev;
2593
2595 "Telling other peer that we expect its full set\n");
2596 op->phase = PHASE_FULL_RECEIVING;
2597#if MEASURE_PERFORMANCE
2598 perf_store.request_full.sent += 1;
2599#endif
2600 struct TransmitFullMessage *signal_msg;
2601 ev = GNUNET_MQ_msg_extra (signal_msg,sizeof(struct TransmitFullMessage),
2603 signal_msg->remote_set_difference = htonl (diff_local);
2604 signal_msg->remote_set_size = htonl (op->local_element_count);
2605 signal_msg->local_set_difference = htonl (diff_remote);
2606 GNUNET_MQ_send (op->mq,
2607 ev);
2608 }
2609 }
2610 else
2611 {
2613 "# of ibf sends",
2614 1,
2615 GNUNET_NO);
2616 if (GNUNET_OK !=
2617 send_ibf (op,
2619 op->ibf_number_buckets_per_element,
2620 op->ibf_bucket_number_factor)))
2621 {
2622 /* Internal error, best we can do is shut the connection */
2624 "Failed to send IBF, closing connection\n");
2626 return;
2627 }
2628 }
2629 GNUNET_CADET_receive_done (op->channel);
2630}
2631
2632
2640static int
2642 uint32_t key,
2643 void *value)
2644{
2645 struct SendElementClosure *sec = cls;
2646 struct Operation *op = sec->op;
2647 struct KeyEntry *ke = value;
2648 struct GNUNET_MQ_Envelope *ev;
2649 struct GNUNET_MessageHeader *mh;
2650
2651 /* Detect 32-bit key collision for the 64-bit IBF keys. */
2652 if (ke->ibf_key.key_val != sec->ibf_key.key_val)
2653 {
2654 op->active_passive_switch_required = true;
2655 return GNUNET_YES;
2656 }
2657
2658 /* Prevent implementation from sending a offer multiple times in case of roll switch */
2659 if (GNUNET_YES ==
2661 op->message_control_flow,
2662 &ke->element->element_hash,
2664 )
2665 {
2667 "Skipping already sent processed element offer!\n");
2668 return GNUNET_YES;
2669 }
2670
2671 /* Save send offer message for message control */
2672 if (GNUNET_YES !=
2674 op->message_control_flow,
2676 &ke->element->element_hash,
2678 )
2679 {
2681 "Double offer message sent found!\n");
2682 GNUNET_break (0);
2684 return GNUNET_NO;
2685 }
2686 ;
2687
2688 /* Mark element to be expected to received */
2689 if (GNUNET_YES !=
2691 op->message_control_flow,
2693 &ke->element->element_hash,
2695 )
2696 {
2698 "Double demand received found!\n");
2699 GNUNET_break (0);
2701 return GNUNET_NO;
2702 }
2703 ;
2704#if MEASURE_PERFORMANCE
2705 perf_store.offer.sent += 1;
2706 perf_store.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode);
2707#endif
2709 sizeof(struct GNUNET_HashCode),
2711 GNUNET_assert (NULL != ev);
2712 *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
2714 "[OP %p] sending element offer (%s) to peer\n",
2715 op,
2717 GNUNET_MQ_send (op->mq, ev);
2718 return GNUNET_YES;
2719}
2720
2721
2728void
2730 struct IBF_Key ibf_key)
2731{
2732 struct SendElementClosure send_cls;
2733
2734 send_cls.ibf_key = ibf_key;
2735 send_cls.op = op;
2737 op->key_to_element,
2738 (uint32_t) ibf_key.
2739 key_val,
2741 &send_cls);
2742}
2743
2744
2752static int
2754{
2755 struct IBF_Key key;
2756 struct IBF_Key last_key;
2757 int side;
2758 unsigned int num_decoded;
2759 struct InvertibleBloomFilter *diff_ibf;
2760
2762
2763 if (GNUNET_OK !=
2764 prepare_ibf (op,
2765 op->remote_ibf->size))
2766 {
2767 GNUNET_break (0);
2768 /* allocation failed */
2769 return GNUNET_SYSERR;
2770 }
2771
2772 diff_ibf = ibf_dup (op->local_ibf);
2773 ibf_subtract (diff_ibf,
2774 op->remote_ibf);
2775
2776 ibf_destroy (op->remote_ibf);
2777 op->remote_ibf = NULL;
2778
2780 "decoding IBF (size=%u)\n",
2781 diff_ibf->size);
2782
2783 num_decoded = 0;
2784 key.key_val = 0; /* just to avoid compiler thinking we use undef'ed variable */
2785
2786 while (1)
2787 {
2788 int res;
2789 int cycle_detected = GNUNET_NO;
2790
2791 last_key = key;
2792
2793 res = ibf_decode (diff_ibf,
2794 &side,
2795 &key);
2796 if (res == GNUNET_OK)
2797 {
2799 "decoded ibf key %lx\n",
2800 (unsigned long) key.key_val);
2801 num_decoded += 1;
2802 if ((num_decoded > diff_ibf->size) ||
2803 ((num_decoded > 1) &&
2804 (last_key.key_val == key.key_val)))
2805 {
2807 "detected cyclic ibf (decoded %u/%u)\n",
2808 num_decoded,
2809 diff_ibf->size);
2810 cycle_detected = GNUNET_YES;
2811 }
2812 }
2813 if ((GNUNET_SYSERR == res) ||
2814 (GNUNET_YES == cycle_detected))
2815 {
2816 uint32_t next_size;
2819 next_size = get_next_ibf_size (op->ibf_bucket_number_factor, num_decoded,
2820 diff_ibf->size);
2823 uint32_t ibf_min_size = IBF_MIN_SIZE | 1;
2824
2825 if (next_size<ibf_min_size)
2826 next_size = ibf_min_size;
2827
2828
2829 if (next_size <= MAX_IBF_SIZE)
2830 {
2832 "decoding failed, sending larger ibf (size %u)\n",
2833 next_size);
2835 "# of IBF retries",
2836 1,
2837 GNUNET_NO);
2838#if MEASURE_PERFORMANCE
2839 perf_store.active_passive_switches += 1;
2840#endif
2841
2842 op->salt_send = op->salt_receive++;
2843
2844 if (GNUNET_OK !=
2845 send_ibf (op, next_size))
2846 {
2847 /* Internal error, best we can do is shut the connection */
2849 "Failed to send IBF, closing connection\n");
2851 ibf_destroy (diff_ibf);
2852 return GNUNET_SYSERR;
2853 }
2854 }
2855 else
2856 {
2858 "# of failed union operations (too large)",
2859 1,
2860 GNUNET_NO);
2861 // XXX: Send the whole set, element-by-element
2863 "set union failed: reached ibf limit\n");
2865 ibf_destroy (diff_ibf);
2866 return GNUNET_SYSERR;
2867 }
2868 break;
2869 }
2870 if (GNUNET_NO == res)
2871 {
2872 struct GNUNET_MQ_Envelope *ev;
2873
2875 "transmitted all values, sending DONE\n");
2876
2877#if MEASURE_PERFORMANCE
2878 perf_store.done.sent += 1;
2879#endif
2881 GNUNET_MQ_send (op->mq, ev);
2882 /* We now wait until we get a DONE message back
2883 * and then wait for our MQ to be flushed and all our
2884 * demands be delivered. */
2885 break;
2886 }
2887 if (1 == side)
2888 {
2889 struct IBF_Key unsalted_key;
2890 unsalt_key (&key,
2891 op->salt_receive,
2892 &unsalted_key);
2894 unsalted_key);
2895 }
2896 else if (-1 == side)
2897 {
2898 struct GNUNET_MQ_Envelope *ev;
2899 struct InquiryMessage *msg;
2900
2901#if MEASURE_PERFORMANCE
2902 perf_store.inquery.sent += 1;
2903 perf_store.inquery.sent_var_bytes += sizeof(struct IBF_Key);
2904#endif
2905
2907 struct GNUNET_HashContext *hashed_key_context =
2909 struct GNUNET_HashCode *hashed_key = (struct
2911 sizeof(struct GNUNET_HashCode));
2913 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
2914 &key,
2915 sizeof(struct IBF_Key));
2916 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
2917 hashed_key);
2918 GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
2919 hashed_key,
2920 &mcfs,
2922 );
2923
2924 /* It may be nice to merge multiple requests, but with CADET's corking it is not worth
2925 * the effort additional complexity. */
2927 sizeof(struct IBF_Key),
2929 msg->salt = htonl (op->salt_receive);
2930 GNUNET_memcpy (&msg[1],
2931 &key,
2932 sizeof(struct IBF_Key));
2934 "sending element inquiry for IBF key %lx\n",
2935 (unsigned long) key.key_val);
2936 GNUNET_MQ_send (op->mq, ev);
2937 }
2938 else
2939 {
2940 GNUNET_assert (0);
2941 }
2942 }
2943 ibf_destroy (diff_ibf);
2944 return GNUNET_OK;
2945}
2946
2947
2955static int
2957 const struct TransmitFullMessage *msg)
2958{
2959 return GNUNET_OK;
2960}
2961
2962
2969static void
2971 const struct TransmitFullMessage *msg)
2972{
2973 struct Operation *op = cls;
2974
2978 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
2979 if (GNUNET_OK !=
2980 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
2981 {
2982 GNUNET_break (0);
2984 return;
2985 }
2986
2988 op->remote_element_count = ntohl (msg->remote_set_size);
2989 op->remote_set_diff = ntohl (msg->remote_set_difference);
2990 op->local_set_diff = ntohl (msg->local_set_difference);
2991
2994 {
2996 "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
2997 "criteria\n");
2998 GNUNET_break_op (0);
3000 return;
3001 }
3002
3004 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
3005 op->set->content->elements);
3006 uint64_t avg_element_size = 0;
3007 if (0 < op->local_element_count)
3008 {
3009 op->total_elements_size_local = 0;
3010 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
3011 &
3013 op);
3014 avg_element_size = op->total_elements_size_local / op->local_element_count;
3015 }
3016
3018 int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
3019 op->
3021 op->
3023 op->local_set_diff,
3024 op->remote_set_diff,
3025 op->
3027 op->
3030 {
3032 "PROTOCOL VIOLATION: Remote peer choose to send his full set first but correct mode would have been"
3033 " : %d\n", mode_of_operation);
3034 GNUNET_break_op (0);
3036 return;
3037 }
3038 op->phase = PHASE_FULL_RECEIVING;
3039}
3040
3041
3052static int
3054 const struct IBFMessage *msg)
3055{
3056 struct Operation *op = cls;
3057 unsigned int buckets_in_message;
3058
3059 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
3061 if (0 == buckets_in_message)
3062 {
3063 GNUNET_break_op (0);
3064 return GNUNET_SYSERR;
3065 }
3066 if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message
3068 {
3069 GNUNET_break_op (0);
3070 return GNUNET_SYSERR;
3071 }
3072 if (op->phase == PHASE_EXPECT_IBF_LAST)
3073 {
3074 if (ntohl (msg->offset) != op->ibf_buckets_received)
3075 {
3076 GNUNET_break_op (0);
3077 return GNUNET_SYSERR;
3078 }
3079
3080 if (msg->ibf_size != op->remote_ibf->size)
3081 {
3082 GNUNET_break_op (0);
3083 return GNUNET_SYSERR;
3084 }
3085 if (ntohl (msg->salt) != op->salt_receive)
3086 {
3087 GNUNET_break_op (0);
3088 return GNUNET_SYSERR;
3089 }
3090 }
3091 else if ((op->phase != PHASE_PASSIVE_DECODING) &&
3092 (op->phase != PHASE_EXPECT_IBF))
3093 {
3094 GNUNET_break_op (0);
3095 return GNUNET_SYSERR;
3096 }
3097
3098 return GNUNET_OK;
3099}
3100
3101
3111static void
3113 const struct IBFMessage *msg)
3114{
3115 struct Operation *op = cls;
3116 unsigned int buckets_in_message;
3120 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF, PHASE_EXPECT_IBF_LAST,
3122 if (GNUNET_OK !=
3123 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3124 {
3125 GNUNET_break (0);
3127 return;
3128 }
3129 op->differential_sync_iterations++;
3131 op->active_passive_switch_required = false;
3132
3133#if MEASURE_PERFORMANCE
3134 perf_store.ibf.received += 1;
3135 perf_store.ibf.received_var_bytes += (ntohs (msg->header.size) - sizeof *msg);
3136#endif
3137
3138 buckets_in_message = (ntohs (msg->header.size) - sizeof *msg)
3140 if ((op->phase == PHASE_PASSIVE_DECODING) ||
3141 (op->phase == PHASE_EXPECT_IBF))
3142 {
3143 op->phase = PHASE_EXPECT_IBF_LAST;
3144 GNUNET_assert (NULL == op->remote_ibf);
3146 "Creating new ibf of size %u\n",
3147 ntohl (msg->ibf_size));
3148 // op->remote_ibf = ibf_create (1 << msg->order, SE_IBF_HASH_NUM);
3149 op->remote_ibf = ibf_create (msg->ibf_size,
3150 ((uint8_t) op->ibf_number_buckets_per_element));
3151 op->salt_receive = ntohl (msg->salt);
3153 "Receiving new IBF with salt %u\n",
3154 op->salt_receive);
3155 if (NULL == op->remote_ibf)
3156 {
3158 "Failed to parse remote IBF, closing connection\n");
3160 return;
3161 }
3162 op->ibf_buckets_received = 0;
3163 if (0 != ntohl (msg->offset))
3164 {
3165 GNUNET_break_op (0);
3167 return;
3168 }
3169 }
3170 else
3171 {
3174 "Received more of IBF\n");
3175 }
3176 GNUNET_assert (NULL != op->remote_ibf);
3177
3178 ibf_read_slice (&msg[1],
3179 op->ibf_buckets_received,
3180 buckets_in_message,
3181 op->remote_ibf, msg->ibf_counter_bit_length);
3182 op->ibf_buckets_received += buckets_in_message;
3183
3184 if (op->ibf_buckets_received == op->remote_ibf->size)
3185 {
3187 "received full ibf\n");
3188 op->phase = PHASE_ACTIVE_DECODING;
3189 if (GNUNET_OK !=
3191 {
3192 /* Internal error, best we can do is shut down */
3194 "Failed to decode IBF, closing connection\n");
3196 return;
3197 }
3198 }
3199 GNUNET_CADET_receive_done (op->channel);
3200}
3201
3202
3211static void
3213 const struct GNUNET_SETU_Element *element,
3215{
3216 struct GNUNET_MQ_Envelope *ev;
3217 struct GNUNET_SETU_ResultMessage *rm;
3218
3220 "sending element (size %u) to client\n",
3221 element->size);
3222 GNUNET_assert (0 != op->client_request_id);
3223 ev = GNUNET_MQ_msg_extra (rm,
3224 element->size,
3226 if (NULL == ev)
3227 {
3228 GNUNET_MQ_discard (ev);
3229 GNUNET_break (0);
3230 return;
3231 }
3232 rm->result_status = htons (status);
3233 rm->request_id = htonl (op->client_request_id);
3234 rm->element_type = htons (element->element_type);
3236 op->key_to_element));
3237 GNUNET_memcpy (&rm[1],
3238 element->data,
3239 element->size);
3240 GNUNET_MQ_send (op->set->cs->mq,
3241 ev);
3242}
3243
3244
3250static void
3252{
3253 unsigned int num_demanded;
3254
3255 num_demanded = GNUNET_CONTAINER_multihashmap_size (
3256 op->demanded_hashes);
3258 op->message_control_flow,
3259 &
3261 op);
3262 if (PHASE_FINISH_WAITING == op->phase)
3263 {
3265 "In PHASE_FINISH_WAITING, pending %u demands -> %d\n",
3266 num_demanded, op->peer_site);
3267 if (-1 != send_done)
3268 {
3269 struct GNUNET_MQ_Envelope *ev;
3270
3271 op->phase = PHASE_FINISHED;
3272#if MEASURE_PERFORMANCE
3273 perf_store.done.sent += 1;
3274#endif
3276 GNUNET_MQ_send (op->mq,
3277 ev);
3278 /* We now wait until the other peer sends P2P_OVER
3279 * after it got all elements from us. */
3280 }
3281 }
3282 if (PHASE_FINISH_CLOSING == op->phase)
3283 {
3285 "In PHASE_FINISH_CLOSING, pending %u demands %d\n",
3286 num_demanded, op->peer_site);
3287 if (-1 != send_done)
3288 {
3289 op->phase = PHASE_FINISHED;
3292 }
3293 }
3294}
3295
3296
3303static int
3305 const struct GNUNET_SETU_ElementMessage *emsg)
3306{
3307 struct Operation *op = cls;
3308
3309 if (0 == GNUNET_CONTAINER_multihashmap_size (op->demanded_hashes))
3310 {
3311 GNUNET_break_op (0);
3312 return GNUNET_SYSERR;
3313 }
3314 return GNUNET_OK;
3315}
3316
3317
3326static void
3328 const struct GNUNET_SETU_ElementMessage *emsg)
3329{
3330 struct Operation *op = cls;
3331 struct ElementEntry *ee;
3332 struct KeyEntry *ke;
3333 uint16_t element_size;
3334
3338 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3340 if (GNUNET_OK !=
3341 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3342 {
3343 GNUNET_break (0);
3345 return;
3346 }
3347
3348 element_size = ntohs (emsg->header.size) - sizeof(struct
3350#if MEASURE_PERFORMANCE
3351 perf_store.element.received += 1;
3352 perf_store.element.received_var_bytes += element_size;
3353#endif
3354
3355 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
3356 GNUNET_memcpy (&ee[1],
3357 &emsg[1],
3358 element_size);
3359 ee->element.size = element_size;
3360 ee->element.data = &ee[1];
3361 ee->element.element_type = ntohs (emsg->element_type);
3362 ee->remote = GNUNET_YES;
3364 &ee->element_hash);
3365 if (GNUNET_NO ==
3366 GNUNET_CONTAINER_multihashmap_remove (op->demanded_hashes,
3367 &ee->element_hash,
3368 NULL))
3369 {
3370 /* We got something we didn't demand, since it's not in our map. */
3371 GNUNET_break_op (0);
3373 return;
3374 }
3375
3376 if (GNUNET_OK !=
3378 op->message_control_flow,
3380 &ee->element_hash,
3382 )
3383 {
3385 "An element has been received more than once!\n");
3386 GNUNET_break (0);
3388 return;
3389 }
3390
3392 "Got element (size %u, hash %s) from peer\n",
3393 (unsigned int) element_size,
3394 GNUNET_h2s (&ee->element_hash));
3395
3397 "# received elements",
3398 1,
3399 GNUNET_NO);
3401 "# exchanged elements",
3402 1,
3403 GNUNET_NO);
3404
3405 op->received_total++;
3406
3407 ke = op_get_element (op,
3408 &ee->element_hash);
3409 if (NULL != ke)
3410 {
3411 /* Got repeated element. Should not happen since
3412 * we track demands. */
3414 "# repeated elements",
3415 1,
3416 GNUNET_NO);
3417 ke->received = GNUNET_YES;
3418 GNUNET_free (ee);
3419 }
3420 else
3421 {
3423 "Registering new element from remote peer\n");
3424 op->received_fresh++;
3426 /* only send results immediately if the client wants it */
3428 &ee->element,
3430 }
3431
3432 if ((op->received_total > 8) &&
3433 (op->received_fresh < op->received_total / 3))
3434 {
3435 /* The other peer gave us lots of old elements, there's something wrong. */
3436 GNUNET_break_op (0);
3438 return;
3439 }
3440 GNUNET_CADET_receive_done (op->channel);
3441 maybe_finish (op);
3442}
3443
3444
3451static int
3453 const struct GNUNET_SETU_ElementMessage *emsg)
3454{
3455 struct Operation *op = cls;
3456
3457 (void) op;
3458
3459 // FIXME: check that we expect full elements here?
3460 return GNUNET_OK;
3461}
3462
3463
3470static void
3472 const struct GNUNET_SETU_ElementMessage *emsg)
3473{
3474 struct Operation *op = cls;
3475 struct ElementEntry *ee;
3476 struct KeyEntry *ke;
3477 uint16_t element_size;
3478
3482 uint8_t allowed_phases[] = {PHASE_FULL_RECEIVING, PHASE_FULL_SENDING};
3483 if (GNUNET_OK !=
3484 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3485 {
3486 GNUNET_break (0);
3488 return;
3489 }
3490
3491 element_size = ntohs (emsg->header.size)
3492 - sizeof(struct GNUNET_SETU_ElementMessage);
3493
3494#if MEASURE_PERFORMANCE
3495 perf_store.element_full.received += 1;
3496 perf_store.element_full.received_var_bytes += element_size;
3497#endif
3498
3499 ee = GNUNET_malloc (sizeof(struct ElementEntry) + element_size);
3500 GNUNET_memcpy (&ee[1], &emsg[1], element_size);
3501 ee->element.size = element_size;
3502 ee->element.data = &ee[1];
3503 ee->element.element_type = ntohs (emsg->element_type);
3504 ee->remote = GNUNET_YES;
3506 &ee->element_hash);
3508 "Got element (full diff, size %u, hash %s) from peer\n",
3509 (unsigned int) element_size,
3510 GNUNET_h2s (&ee->element_hash));
3511
3513 "# received elements",
3514 1,
3515 GNUNET_NO);
3517 "# exchanged elements",
3518 1,
3519 GNUNET_NO);
3520
3521 op->received_total++;
3522 ke = op_get_element (op,
3523 &ee->element_hash);
3524 if (NULL != ke)
3525 {
3527 "# repeated elements",
3528 1,
3529 GNUNET_NO);
3531 ke->received = GNUNET_YES;
3532 GNUNET_free (ee);
3533 }
3534 else
3535 {
3537 "Registering new element from remote peer\n");
3538 op->received_fresh++;
3540 /* only send results immediately if the client wants it */
3542 &ee->element,
3544 }
3545
3546
3547 if ((GNUNET_YES == op->byzantine) &&
3548 (op->received_total > op->remote_element_count) )
3549 {
3550 /* The other peer gave us lots of old elements, there's something wrong. */
3552 "Other peer sent %llu elements while pretending to have %llu elements, failing operation\n",
3553 (unsigned long long) op->received_total,
3554 (unsigned long long) op->remote_element_count);
3555 GNUNET_break_op (0);
3557 return;
3558 }
3559 GNUNET_CADET_receive_done (op->channel);
3560}
3561
3562
3570static int
3572 const struct InquiryMessage *msg)
3573{
3574 struct Operation *op = cls;
3575 unsigned int num_keys;
3576
3577 if (op->phase != PHASE_PASSIVE_DECODING)
3578 {
3579 GNUNET_break_op (0);
3580 return GNUNET_SYSERR;
3581 }
3582 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3583 / sizeof(struct IBF_Key);
3584 if ((ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3585 != num_keys * sizeof(struct IBF_Key))
3586 {
3587 GNUNET_break_op (0);
3588 return GNUNET_SYSERR;
3589 }
3590 return GNUNET_OK;
3591}
3592
3593
3600static void
3602 const struct InquiryMessage *msg)
3603{
3604 struct Operation *op = cls;
3605 const struct IBF_Key *ibf_key;
3606 unsigned int num_keys;
3607
3611 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
3612 if (GNUNET_OK !=
3613 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3614 {
3615 GNUNET_break (0);
3617 return;
3618 }
3619
3620#if MEASURE_PERFORMANCE
3621 perf_store.inquery.received += 1;
3622 perf_store.inquery.received_var_bytes += (ntohs (msg->header.size)
3623 - sizeof(struct InquiryMessage));
3624#endif
3625
3627 "Received union inquiry\n");
3628 num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
3629 / sizeof(struct IBF_Key);
3630 ibf_key = (const struct IBF_Key *) &msg[1];
3631
3633 struct GNUNET_HashContext *hashed_key_context =
3635 struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*) GNUNET_malloc (
3636 sizeof(struct GNUNET_HashCode));;
3638 GNUNET_CRYPTO_hash_context_read (hashed_key_context,
3639 &ibf_key,
3640 sizeof(struct IBF_Key));
3641 GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
3642 hashed_key);
3643 GNUNET_CONTAINER_multihashmap_put (op->inquiries_sent,
3644 hashed_key,
3645 &mcfs,
3647 );
3648
3649 while (0 != num_keys--)
3650 {
3651 struct IBF_Key unsalted_key;
3652 unsalt_key (ibf_key,
3653 ntohl (msg->salt),
3654 &unsalted_key);
3656 unsalted_key);
3657 ibf_key++;
3658 }
3659 GNUNET_CADET_receive_done (op->channel);
3660}
3661
3662
3673static int
3675 uint32_t key,
3676 void *value)
3677{
3678 struct Operation *op = cls;
3679 struct KeyEntry *ke = value;
3680 struct GNUNET_MQ_Envelope *ev;
3681 struct GNUNET_SETU_ElementMessage *emsg;
3682 struct ElementEntry *ee = ke->element;
3683
3684 if (GNUNET_YES == ke->received)
3685 return GNUNET_YES;
3686#if MEASURE_PERFORMANCE
3687 perf_store.element_full.received += 1;
3688#endif
3689 ev = GNUNET_MQ_msg_extra (emsg,
3690 ee->element.size,
3692 GNUNET_memcpy (&emsg[1],
3693 ee->element.data,
3694 ee->element.size);
3695 emsg->element_type = htons (ee->element.element_type);
3696 GNUNET_MQ_send (op->mq,
3697 ev);
3698 return GNUNET_YES;
3699}
3700
3701
3708static int
3710 const struct TransmitFullMessage *mh)
3711{
3712 return GNUNET_OK;
3713}
3714
3715
3716static void
3718 const struct TransmitFullMessage *msg)
3719{
3720 struct Operation *op = cls;
3721
3725 uint8_t allowed_phases[] = {PHASE_EXPECT_IBF};
3726 if (GNUNET_OK !=
3727 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3728 {
3729 GNUNET_break (0);
3731 return;
3732 }
3733
3734 op->remote_element_count = ntohl (msg->remote_set_size);
3735 op->remote_set_diff = ntohl (msg->remote_set_difference);
3736 op->local_set_diff = ntohl (msg->local_set_difference);
3737
3738
3740 {
3742 "PROTOCOL VIOLATION: Parameters transmitted from other peer do not satisfie byzantine "
3743 "criteria\n");
3744 GNUNET_break_op (0);
3746 return;
3747 }
3748
3749#if MEASURE_PERFORMANCE
3750 perf_store.request_full.received += 1;
3751#endif
3752
3754 "Received request for full set transmission\n");
3755
3757 op->local_element_count = GNUNET_CONTAINER_multihashmap_size (
3758 op->set->content->elements);
3759 uint64_t avg_element_size = 0;
3760 if (0 < op->local_element_count)
3761 {
3762 op->total_elements_size_local = 0;
3763 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
3764 &
3766 op);
3767 avg_element_size = op->total_elements_size_local / op->local_element_count;
3768 }
3769
3770 int mode_of_operation = estimate_best_mode_of_operation (avg_element_size,
3771 op->
3773 op->
3775 op->local_set_diff,
3776 op->remote_set_diff,
3777 op->
3779 op->
3782 {
3784 "PROTOCOL VIOLATION: Remote peer choose to request the full set first but correct mode would have been"
3785 " : %d\n", mode_of_operation);
3786 GNUNET_break_op (0);
3788 return;
3789 }
3790
3791 // FIXME: we need to check that our set is larger than the
3792 // byzantine_lower_bound by some threshold
3793 send_full_set (op);
3794 GNUNET_CADET_receive_done (op->channel);
3795}
3796
3797
3804static void
3806 const struct GNUNET_MessageHeader *mh)
3807{
3808 struct Operation *op = cls;
3809
3813 uint8_t allowed_phases[] = {PHASE_FULL_SENDING, PHASE_FULL_RECEIVING};
3814 if (GNUNET_OK !=
3815 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3816 {
3817 GNUNET_break (0);
3819 return;
3820 }
3821
3822#if MEASURE_PERFORMANCE
3823 perf_store.full_done.received += 1;
3824#endif
3825
3826 switch (op->phase)
3827 {
3829 {
3830 struct GNUNET_MQ_Envelope *ev;
3831
3832 if ((GNUNET_YES == op->byzantine) &&
3833 (op->received_total != op->remote_element_count) )
3834 {
3835 /* The other peer gave not enough elements before sending full done, there's something wrong. */
3837 "Other peer sent only %llu/%llu fresh elements, failing operation\n",
3838 (unsigned long long) op->received_total,
3839 (unsigned long long) op->remote_element_count);
3840 GNUNET_break_op (0);
3842 return;
3843 }
3844
3846 "got FULL DONE, sending elements that other peer is missing\n");
3847
3848 /* send all the elements that did not come from the remote peer */
3851 op);
3852#if MEASURE_PERFORMANCE
3853 perf_store.full_done.sent += 1;
3854#endif
3856 GNUNET_MQ_send (op->mq,
3857 ev);
3858 op->phase = PHASE_FINISHED;
3859 /* we now wait until the other peer sends us the OVER message*/
3860 }
3861 break;
3862
3863 case PHASE_FULL_SENDING:
3864 {
3866 "got FULL DONE, finishing\n");
3867 /* We sent the full set, and got the response for that. We're done. */
3868 op->phase = PHASE_FINISHED;
3869 GNUNET_CADET_receive_done (op->channel);
3872 return;
3873 }
3874
3875 default:
3877 "Handle full done phase is %u\n",
3878 (unsigned) op->phase);
3879 GNUNET_break_op (0);
3881 return;
3882 }
3883 GNUNET_CADET_receive_done (op->channel);
3884}
3885
3886
3895static int
3897 const struct GNUNET_MessageHeader *mh)
3898{
3899 struct Operation *op = cls;
3900 unsigned int num_hashes;
3901
3902 (void) op;
3903 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3904 / sizeof(struct GNUNET_HashCode);
3905 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3906 != num_hashes * sizeof(struct GNUNET_HashCode))
3907 {
3908 GNUNET_break_op (0);
3909 return GNUNET_SYSERR;
3910 }
3911 return GNUNET_OK;
3912}
3913
3914
3922static void
3924 const struct GNUNET_MessageHeader *mh)
3925{
3926 struct Operation *op = cls;
3927 struct ElementEntry *ee;
3928 struct GNUNET_SETU_ElementMessage *emsg;
3929 const struct GNUNET_HashCode *hash;
3930 unsigned int num_hashes;
3931 struct GNUNET_MQ_Envelope *ev;
3932
3936 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING,
3938 if (GNUNET_OK !=
3939 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
3940 {
3941 GNUNET_break (0);
3943 return;
3944 }
3945#if MEASURE_PERFORMANCE
3946 perf_store.demand.received += 1;
3947 perf_store.demand.received_var_bytes += (ntohs (mh->size) - sizeof(struct
3949#endif
3950
3951 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
3952 / sizeof(struct GNUNET_HashCode);
3953 for (hash = (const struct GNUNET_HashCode *) &mh[1];
3954 num_hashes > 0;
3955 hash++, num_hashes--)
3956 {
3957 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
3958 hash);
3959 if (NULL == ee)
3960 {
3961 /* Demand for non-existing element. */
3962 GNUNET_break_op (0);
3964 return;
3965 }
3966
3967 /* Save send demand message for message control */
3968 if (GNUNET_YES !=
3970 op->message_control_flow,
3972 &ee->element_hash,
3974 )
3975 {
3977 "Double demand message received found!\n");
3978 GNUNET_break (0);
3980 return;
3981 }
3982 ;
3983
3984 /* Mark element to be expected to received */
3985 if (GNUNET_YES !=
3987 op->message_control_flow,
3989 &ee->element_hash,
3991 )
3992 {
3994 "Double element message sent found!\n");
3995 GNUNET_break (0);
3997 return;
3998 }
4000 {
4001 /* Probably confused lazily copied sets. */
4002 GNUNET_break_op (0);
4004 return;
4005 }
4006#if MEASURE_PERFORMANCE
4007 perf_store.element.sent += 1;
4008 perf_store.element.sent_var_bytes += ee->element.size;
4009#endif
4010 ev = GNUNET_MQ_msg_extra (emsg,
4011 ee->element.size,
4013 GNUNET_memcpy (&emsg[1],
4014 ee->element.data,
4015 ee->element.size);
4016 emsg->reserved = htons (0);
4017 emsg->element_type = htons (ee->element.element_type);
4019 "[OP %p] Sending demanded element (size %u, hash %s) to peer\n",
4020 op,
4021 (unsigned int) ee->element.size,
4022 GNUNET_h2s (&ee->element_hash));
4023 GNUNET_MQ_send (op->mq, ev);
4025 "# exchanged elements",
4026 1,
4027 GNUNET_NO);
4028 if (op->symmetric)
4030 &ee->element,
4032 }
4033 GNUNET_CADET_receive_done (op->channel);
4034 maybe_finish (op);
4035}
4036
4037
4045static int
4047 const struct GNUNET_MessageHeader *mh)
4048{
4049 struct Operation *op = cls;
4050 unsigned int num_hashes;
4051
4052 /* look up elements and send them */
4053 if ((op->phase != PHASE_PASSIVE_DECODING) &&
4054 (op->phase != PHASE_ACTIVE_DECODING))
4055 {
4056 GNUNET_break_op (0);
4057 return GNUNET_SYSERR;
4058 }
4059 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
4060 / sizeof(struct GNUNET_HashCode);
4061 if ((ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader)) !=
4062 num_hashes * sizeof(struct GNUNET_HashCode))
4063 {
4064 GNUNET_break_op (0);
4065 return GNUNET_SYSERR;
4066 }
4067 return GNUNET_OK;
4068}
4069
4070
4078static void
4080 const struct GNUNET_MessageHeader *mh)
4081{
4082 struct Operation *op = cls;
4083 const struct GNUNET_HashCode *hash;
4084 unsigned int num_hashes;
4088 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4089 if (GNUNET_OK !=
4090 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4091 {
4092 GNUNET_break (0);
4094 return;
4095 }
4096
4097#if MEASURE_PERFORMANCE
4098 perf_store.offer.received += 1;
4099 perf_store.offer.received_var_bytes += (ntohs (mh->size) - sizeof(struct
4101#endif
4102
4103 num_hashes = (ntohs (mh->size) - sizeof(struct GNUNET_MessageHeader))
4104 / sizeof(struct GNUNET_HashCode);
4105 for (hash = (const struct GNUNET_HashCode *) &mh[1];
4106 num_hashes > 0;
4107 hash++, num_hashes--)
4108 {
4109 struct ElementEntry *ee;
4110 struct GNUNET_MessageHeader *demands;
4111 struct GNUNET_MQ_Envelope *ev;
4112
4113 ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
4114 hash);
4115 if (NULL != ee)
4117 continue;
4118
4119 if (GNUNET_YES ==
4121 hash))
4122 {
4124 "Skipped sending duplicate demand\n");
4125 continue;
4126 }
4127
4130 op->demanded_hashes,
4131 hash,
4132 NULL,
4134
4136 "[OP %p] Requesting element (hash %s)\n",
4137 op, GNUNET_h2s (hash));
4138
4139#if MEASURE_PERFORMANCE
4140 perf_store.demand.sent += 1;
4141 perf_store.demand.sent_var_bytes += sizeof(struct GNUNET_HashCode);
4142#endif
4143 /* Save send demand message for message control */
4144 if (GNUNET_YES !=
4146 op->message_control_flow,
4148 hash,
4150 {
4152 "Double demand message sent found!\n");
4153 GNUNET_break (0);
4155 return;
4156 }
4157
4158 /* Mark offer as received received */
4159 if (GNUNET_YES !=
4161 op->message_control_flow,
4163 hash,
4165 {
4167 "Double offer message received found!\n");
4168 GNUNET_break (0);
4170 return;
4171 }
4172 /* Mark element to be expected to received */
4173 if (GNUNET_YES !=
4175 op->message_control_flow,
4177 hash,
4179 {
4181 "Element already expected!\n");
4182 GNUNET_break (0);
4184 return;
4185 }
4186 ev = GNUNET_MQ_msg_header_extra (demands,
4187 sizeof(struct GNUNET_HashCode),
4189 GNUNET_memcpy (&demands[1],
4190 hash,
4191 sizeof(struct GNUNET_HashCode));
4192 GNUNET_MQ_send (op->mq, ev);
4193 }
4194 GNUNET_CADET_receive_done (op->channel);
4195}
4196
4197
4204static void
4206 const struct GNUNET_MessageHeader *mh)
4207{
4208 struct Operation *op = cls;
4209
4213 uint8_t allowed_phases[] = {PHASE_ACTIVE_DECODING, PHASE_PASSIVE_DECODING};
4214 if (GNUNET_OK !=
4215 check_valid_phase (allowed_phases,sizeof(allowed_phases),op))
4216 {
4217 GNUNET_break (0);
4219 return;
4220 }
4221
4222 if (op->active_passive_switch_required)
4223 {
4225 "PROTOCOL VIOLATION: Received done but role change is necessary\n");
4226 GNUNET_break (0);
4228 return;
4229 }
4230
4231#if MEASURE_PERFORMANCE
4232 perf_store.done.received += 1;
4233#endif
4234 switch (op->phase)
4235 {
4237 /* We got all requests, but still have to send our elements in response. */
4238 op->phase = PHASE_FINISH_WAITING;
4240 "got DONE (as passive partner), waiting for our demands to be satisfied\n");
4241 /* The active peer is done sending offers
4242 * and inquiries. This means that all
4243 * our responses to that (demands and offers)
4244 * must be in flight (queued or in mesh).
4245 *
4246 * We should notify the active peer once
4247 * all our demands are satisfied, so that the active
4248 * peer can quit if we gave it everything.
4249 */GNUNET_CADET_receive_done (op->channel);
4250 maybe_finish (op);
4251 return;
4254 "got DONE (as active partner), waiting to finish\n");
4255 /* All demands of the other peer are satisfied,
4256 * and we processed all offers, thus we know
4257 * exactly what our demands must be.
4258 *
4259 * We'll close the channel
4260 * to the other peer once our demands are met.
4261 */op->phase = PHASE_FINISH_CLOSING;
4262 GNUNET_CADET_receive_done (op->channel);
4263 maybe_finish (op);
4264 return;
4265 default:
4266 GNUNET_break_op (0);
4268 return;
4269 }
4270}
4271
4272
4279static void
4281 const struct GNUNET_MessageHeader *mh)
4282{
4283#if MEASURE_PERFORMANCE
4284 perf_store.over.received += 1;
4285#endif
4286 send_client_done (cls);
4287}
4288
4289
4298static struct Operation *
4299get_incoming (uint32_t id)
4300{
4301 for (struct Listener *listener = listener_head;
4302 NULL != listener;
4304 {
4305 for (struct Operation *op = listener->op_head;
4306 NULL != op;
4307 op = op->next)
4308 if (op->suggest_id == id)
4309 return op;
4310 }
4311 return NULL;
4312}
4313
4314
4323static void *
4325 struct GNUNET_SERVICE_Client *c,
4326 struct GNUNET_MQ_Handle *mq)
4327{
4328 struct ClientState *cs;
4329
4330 num_clients++;
4331 cs = GNUNET_new (struct ClientState);
4332 cs->client = c;
4333 cs->mq = mq;
4334 return cs;
4335}
4336
4337
4346static int
4348 const struct GNUNET_HashCode *key,
4349 void *value)
4350{
4351 struct ElementEntry *ee = value;
4352
4353 GNUNET_free (ee);
4354 return GNUNET_YES;
4355}
4356
4357
4365static void
4367 struct GNUNET_SERVICE_Client *client,
4368 void *internal_cls)
4369{
4370 struct ClientState *cs = internal_cls;
4371 struct Operation *op;
4372 struct Listener *listener;
4373 struct Set *set;
4374
4376 "Client disconnected, cleaning up\n");
4377 if (NULL != (set = cs->set))
4378 {
4379 struct SetContent *content = set->content;
4380
4382 "Destroying client's set\n");
4383 /* Destroy pending set operations */
4384 while (NULL != set->ops_head)
4386
4387 /* Destroy operation-specific state */
4388 if (NULL != set->se)
4389 {
4391 set->se = NULL;
4392 }
4393 /* free set content (or at least decrement RC) */
4394 set->content = NULL;
4395 GNUNET_assert (0 != content->refcount);
4396 content->refcount--;
4397 if (0 == content->refcount)
4398 {
4399 GNUNET_assert (NULL != content->elements);
4402 NULL);
4404 content->elements = NULL;
4405 GNUNET_free (content);
4406 }
4407 GNUNET_free (set);
4408 }
4409
4410 if (NULL != (listener = cs->listener))
4411 {
4413 "Destroying client's listener\n");
4415 listener->open_port = NULL;
4416 while (NULL != (op = listener->op_head))
4417 {
4419 "Destroying incoming operation `%u' from peer `%s'\n",
4420 (unsigned int) op->client_request_id,
4421 GNUNET_i2s (&op->peer));
4423 }
4426 listener);
4427 GNUNET_free (listener);
4428 }
4429 GNUNET_free (cs);
4430 num_clients--;
4431 if ( (GNUNET_YES == in_shutdown) &&
4432 (0 == num_clients) )
4433 {
4434 if (NULL != cadet)
4435 {
4437 cadet = NULL;
4438 }
4439 }
4440}
4441
4442
4451static int
4453 const struct OperationRequestMessage *msg)
4454{
4455 struct Operation *op = cls;
4456 struct Listener *listener = op->listener;
4457 const struct GNUNET_MessageHeader *nested_context;
4458
4459 /* double operation request */
4460 if (0 != op->suggest_id)
4461 {
4462 GNUNET_break_op (0);
4463 return GNUNET_SYSERR;
4464 }
4465 /* This should be equivalent to the previous condition, but can't hurt to check twice */
4466 if (NULL == listener)
4467 {
4468 GNUNET_break (0);
4469 return GNUNET_SYSERR;
4470 }
4471 nested_context = GNUNET_MQ_extract_nested_mh (msg);
4472 if ((NULL != nested_context) &&
4473 (ntohs (nested_context->size) > GNUNET_SETU_CONTEXT_MESSAGE_MAX_SIZE))
4474 {
4475 GNUNET_break_op (0);
4476 return GNUNET_SYSERR;
4477 }
4478 return GNUNET_OK;
4479}
4480
4481
4497static void
4499 const struct OperationRequestMessage *msg)
4500{
4501 struct Operation *op = cls;
4502 struct Listener *listener = op->listener;
4503 const struct GNUNET_MessageHeader *nested_context;
4504 struct GNUNET_MQ_Envelope *env;
4505 struct GNUNET_SETU_RequestMessage *cmsg;
4506
4507 nested_context = GNUNET_MQ_extract_nested_mh (msg);
4508 /* Make a copy of the nested_context (application-specific context
4509 information that is opaque to set) so we can pass it to the
4510 listener later on */
4511 if (NULL != nested_context)
4512 op->context_msg = GNUNET_copy_message (nested_context);
4513 op->remote_element_count = ntohl (msg->element_count);
4514 GNUNET_log (
4516 "Received P2P operation request (port %s) for active listener\n",
4517 GNUNET_h2s (&op->listener->app_id));
4518 GNUNET_assert (0 == op->suggest_id);
4519 if (0 == suggest_id)
4520 suggest_id++;
4521 op->suggest_id = suggest_id++;
4522 GNUNET_assert (NULL != op->timeout_task);
4523 GNUNET_SCHEDULER_cancel (op->timeout_task);
4524 op->timeout_task = NULL;
4527 op->context_msg);
4528 GNUNET_log (
4530 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
4531 op->suggest_id,
4532 listener,
4533 listener->cs);
4534 cmsg->accept_id = htonl (op->suggest_id);
4535 cmsg->peer_id = op->peer;
4536 GNUNET_MQ_send (listener->cs->mq,
4537 env);
4538 /* NOTE: GNUNET_CADET_receive_done() will be called in
4539 #handle_client_accept() */
4540}
4541
4542
4551static void
4553 const struct GNUNET_SETU_CreateMessage *msg)
4554{
4555 struct ClientState *cs = cls;
4556 struct Set *set;
4557
4559 "Client created new set for union operation\n");
4560 if (NULL != cs->set)
4561 {
4562 /* There can only be one set per client */
4563 GNUNET_break (0);
4565 return;
4566 }
4567 set = GNUNET_new (struct Set);
4568 {
4569 struct MultiStrataEstimator *se;
4570
4574 if (NULL == se)
4575 {
4577 "Failed to allocate strata estimator\n");
4578 GNUNET_free (set);
4580 return;
4581 }
4582 set->se = se;
4583 }
4584 set->content = GNUNET_new (struct SetContent);
4585 set->content->refcount = 1;
4587 GNUNET_YES);
4588 set->cs = cs;
4589 cs->set = set;
4591}
4592
4593
4603static void
4605{
4606 struct Operation *op = cls;
4607
4608 op->timeout_task = NULL;
4610 "Remote peer's incoming request timed out\n");
4612}
4613
4614
4631static void *
4634 const struct GNUNET_PeerIdentity *source)
4635{
4636 struct Listener *listener = cls;
4637 struct Operation *op;
4638
4640 "New incoming channel\n");
4641 op = GNUNET_new (struct Operation);
4642 op->listener = listener;
4643 op->peer = *source;
4644 op->channel = channel;
4645 op->mq = GNUNET_CADET_get_mq (op->channel);
4647 UINT32_MAX);
4650 op);
4653 op);
4654 return op;
4655}
4656
4657
4674static void
4675channel_end_cb (void *channel_ctx,
4676 const struct GNUNET_CADET_Channel *channel)
4677{
4678 struct Operation *op = channel_ctx;
4679
4680 op->channel = NULL;
4682}
4683
4684
4699static void
4701 const struct GNUNET_CADET_Channel *channel,
4702 int window_size)
4703{
4704 /* FIXME: not implemented, we could do flow control here... */
4705}
4706
4707
4715static void
4717 const struct GNUNET_SETU_ListenMessage *msg)
4718{
4719 struct ClientState *cs = cls;
4720 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4721 GNUNET_MQ_hd_var_size (incoming_msg,
4724 NULL),
4725 GNUNET_MQ_hd_var_size (union_p2p_ibf,
4727 struct IBFMessage,
4728 NULL),
4729 GNUNET_MQ_hd_var_size (union_p2p_elements,
4732 NULL),
4733 GNUNET_MQ_hd_var_size (union_p2p_offer,
4735 struct GNUNET_MessageHeader,
4736 NULL),
4737 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
4739 struct InquiryMessage,
4740 NULL),
4741 GNUNET_MQ_hd_var_size (union_p2p_demand,
4743 struct GNUNET_MessageHeader,
4744 NULL),
4745 GNUNET_MQ_hd_fixed_size (union_p2p_done,
4747 struct GNUNET_MessageHeader,
4748 NULL),
4749 GNUNET_MQ_hd_fixed_size (union_p2p_over,
4751 struct GNUNET_MessageHeader,
4752 NULL),
4753 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
4755 struct GNUNET_MessageHeader,
4756 NULL),
4757 GNUNET_MQ_hd_var_size (union_p2p_request_full,
4759 struct TransmitFullMessage,
4760 NULL),
4761 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4764 NULL),
4765 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
4768 NULL),
4769 GNUNET_MQ_hd_var_size (union_p2p_full_element,
4772 NULL),
4773 GNUNET_MQ_hd_var_size (union_p2p_send_full,
4775 struct TransmitFullMessage,
4776 NULL),
4778 };
4779 struct Listener *listener;
4780
4781 if (NULL != cs->listener)
4782 {
4783 /* max. one active listener per client! */
4784 GNUNET_break (0);
4786 return;
4787 }
4788 listener = GNUNET_new (struct Listener);
4789 listener->cs = cs;
4790 cs->listener = listener;
4791 listener->app_id = msg->app_id;
4794 listener);
4796 "New listener created (port %s)\n",
4797 GNUNET_h2s (&listener->app_id));
4799 &msg->app_id,
4801 listener,
4804 cadet_handlers);
4806}
4807
4808
4816static void
4818 const struct GNUNET_SETU_RejectMessage *msg)
4819{
4820 struct ClientState *cs = cls;
4821 struct Operation *op;
4822
4823 op = get_incoming (ntohl (msg->accept_reject_id));
4824 if (NULL == op)
4825 {
4826 /* no matching incoming operation for this reject;
4827 could be that the other peer already disconnected... */
4829 "Client rejected unknown operation %u\n",
4830 (unsigned int) ntohl (msg->accept_reject_id));
4832 return;
4833 }
4835 "Peer request (app %s) rejected by client\n",
4836 GNUNET_h2s (&cs->listener->app_id));
4839}
4840
4841
4848static int
4850 const struct GNUNET_SETU_ElementMessage *msg)
4851{
4852 /* NOTE: Technically, we should probably check with the
4853 block library whether the element we are given is well-formed */
4854 return GNUNET_OK;
4855}
4856
4857
4864static void
4866 const struct GNUNET_SETU_ElementMessage *msg)
4867{
4868 struct ClientState *cs = cls;
4869 struct Set *set;
4870 struct GNUNET_SETU_Element el;
4871 struct ElementEntry *ee;
4872 struct GNUNET_HashCode hash;
4873
4874 if (NULL == (set = cs->set))
4875 {
4876 /* client without a set requested an operation */
4877 GNUNET_break (0);
4879 return;
4880 }
4882 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing mutation on set\n");
4883 el.size = ntohs (msg->header.size) - sizeof(*msg);
4884 el.data = &msg[1];
4885 el.element_type = ntohs (msg->element_type);
4887 &hash);
4889 &hash);
4890 if (NULL == ee)
4891 {
4893 "Client inserts element %s of size %u\n",
4894 GNUNET_h2s (&hash),
4895 el.size);
4896 ee = GNUNET_malloc (el.size + sizeof(*ee));
4897 ee->element.size = el.size;
4898 GNUNET_memcpy (&ee[1], el.data, el.size);
4899 ee->element.data = &ee[1];
4900 ee->element.element_type = el.element_type;
4901 ee->remote = GNUNET_NO;
4902 ee->generation = set->current_generation;
4903 ee->element_hash = hash;
4906 set->content->elements,
4907 &ee->element_hash,
4908 ee,
4910 }
4911 else
4912 {
4914 "Client inserted element %s of size %u twice (ignored)\n",
4915 GNUNET_h2s (&hash),
4916 el.size);
4917 /* same element inserted twice */
4918 return;
4919 }
4921 get_ibf_key (&ee->element_hash));
4922}
4923
4924
4931static void
4933{
4934 set->content->latest_generation++;
4935 set->current_generation++;
4936}
4937
4938
4948static int
4950 const struct GNUNET_SETU_EvaluateMessage *msg)
4951{
4952 /* FIXME: suboptimal, even if the context below could be NULL,
4953 there are malformed messages this does not check for... */
4954 return GNUNET_OK;
4955}
4956
4957
4966static void
4968 const struct GNUNET_SETU_EvaluateMessage *msg)
4969{
4970 struct ClientState *cs = cls;
4971 struct Operation *op = GNUNET_new (struct Operation);
4972
4973 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
4974 GNUNET_MQ_hd_var_size (incoming_msg,
4977 op),
4978 GNUNET_MQ_hd_var_size (union_p2p_ibf,
4980 struct IBFMessage,
4981 op),
4982 GNUNET_MQ_hd_var_size (union_p2p_elements,
4985 op),
4986 GNUNET_MQ_hd_var_size (union_p2p_offer,
4988 struct GNUNET_MessageHeader,
4989 op),
4990 GNUNET_MQ_hd_var_size (union_p2p_inquiry,
4992 struct InquiryMessage,
4993 op),
4994 GNUNET_MQ_hd_var_size (union_p2p_demand,
4996 struct GNUNET_MessageHeader,
4997 op),
4998 GNUNET_MQ_hd_fixed_size (union_p2p_done,
5000 struct GNUNET_MessageHeader,
5001 op),
5002 GNUNET_MQ_hd_fixed_size (union_p2p_over,
5004 struct GNUNET_MessageHeader,
5005 op),
5006 GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
5008 struct GNUNET_MessageHeader,
5009 op),
5010 GNUNET_MQ_hd_var_size (union_p2p_request_full,
5012 struct TransmitFullMessage,
5013 op),
5014 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
5017 op),
5018 GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
5021 op),
5022 GNUNET_MQ_hd_var_size (union_p2p_full_element,
5025 op),
5026 GNUNET_MQ_hd_var_size (union_p2p_send_full,
5028 struct TransmitFullMessage,
5029 NULL),
5031 };
5032 struct Set *set;
5033 const struct GNUNET_MessageHeader *context;
5034
5035 if (NULL == (set = cs->set))
5036 {
5037 GNUNET_break (0);
5038 GNUNET_free (op);
5040 return;
5041 }
5043 UINT32_MAX);
5044 op->peer = msg->target_peer;
5045 op->client_request_id = ntohl (msg->request_id);
5046 op->byzantine = msg->byzantine;
5047 op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
5048 op->force_full = msg->force_full;
5049 op->force_delta = msg->force_delta;
5050 op->symmetric = msg->symmetric;
5051 op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5052 op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5053 op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5054 op->byzantine_upper_bound = msg->byzantine_upper_bond;
5055 op->active_passive_switch_required = false;
5057
5058 /* create hashmap for message control */
5059 op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5060 GNUNET_NO);
5061 op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5062
5063#if MEASURE_PERFORMANCE
5064 /* load config */
5065 load_config (op);
5066#endif
5067
5068 /* Advance generation values, so that
5069 mutations won't interfere with the running operation. */
5070 op->set = set;
5071 op->generation_created = set->current_generation;
5072 advance_generation (set);
5074 set->ops_tail,
5075 op);
5077 "Creating new CADET channel to port %s for set union\n",
5078 GNUNET_h2s (&msg->app_id));
5080 op,
5081 &msg->target_peer,
5082 &msg->app_id,
5085 cadet_handlers);
5086 op->mq = GNUNET_CADET_get_mq (op->channel);
5087 {
5088 struct GNUNET_MQ_Envelope *ev;
5090
5091#if MEASURE_PERFORMANCE
5092 perf_store.operation_request.sent += 1;
5093#endif
5096 context);
5097 if (NULL == ev)
5098 {
5099 /* the context message is too large */
5100 GNUNET_break (0);
5102 return;
5103 }
5104 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
5105 GNUNET_NO);
5106 /* copy the current generation's strata estimator for this operation */
5107 op->se = strata_estimator_dup (op->set->se);
5108 /* we started the operation, thus we have to send the operation request */
5109 op->phase = PHASE_EXPECT_SE;
5110
5111 op->salt_receive = (op->peer_site + 1) % 2;
5112 op->salt_send = op->peer_site; // FIXME?????
5113
5114
5116 "Initiating union operation evaluation\n");
5118 "# of total union operations",
5119 1,
5120 GNUNET_NO);
5122 "# of initiated union operations",
5123 1,
5124 GNUNET_NO);
5125 GNUNET_MQ_send (op->mq,
5126 ev);
5127 if (NULL != context)
5129 "sent op request with context message\n");
5130 else
5132 "sent op request without context message\n");
5135 op->key_to_element);
5136
5137 }
5139}
5140
5141
5148static void
5150 const struct GNUNET_SETU_CancelMessage *msg)
5151{
5152 struct ClientState *cs = cls;
5153 struct Set *set;
5154 struct Operation *op;
5155 int found;
5156
5157 if (NULL == (set = cs->set))
5158 {
5159 /* client without a set requested an operation */
5160 GNUNET_break (0);
5162 return;
5163 }
5164 found = GNUNET_NO;
5165 for (op = set->ops_head; NULL != op; op = op->next)
5166 {
5167 if (op->client_request_id == ntohl (msg->request_id))
5168 {
5169 found = GNUNET_YES;
5170 break;
5171 }
5172 }
5173 if (GNUNET_NO == found)
5174 {
5175 /* It may happen that the operation was already destroyed due to
5176 * the other peer disconnecting. The client may not know about this
5177 * yet and try to cancel the (just barely non-existent) operation.
5178 * So this is not a hard error.
5179 *///
5181 "Client canceled non-existent op %u\n",
5182 (uint32_t) ntohl (msg->request_id));
5183 }
5184 else
5185 {
5187 "Client requested cancel for op %u\n",
5188 (uint32_t) ntohl (msg->request_id));
5190 }
5192}
5193
5194
5203static void
5205 const struct GNUNET_SETU_AcceptMessage *msg)
5206{
5207 struct ClientState *cs = cls;
5208 struct Set *set;
5209 struct Operation *op;
5210 struct GNUNET_SETU_ResultMessage *result_message;
5211 struct GNUNET_MQ_Envelope *ev;
5212 struct Listener *listener;
5213
5214 if (NULL == (set = cs->set))
5215 {
5216 /* client without a set requested to accept */
5217 GNUNET_break (0);
5219 return;
5220 }
5221 op = get_incoming (ntohl (msg->accept_reject_id));
5222 if (NULL == op)
5223 {
5224 /* It is not an error if the set op does not exist -- it may
5225 * have been destroyed when the partner peer disconnected. */
5226 GNUNET_log (
5228 "Client %p accepted request %u of listener %p that is no longer active\n",
5229 cs,
5230 ntohl (msg->accept_reject_id),
5231 cs->listener);
5232 ev = GNUNET_MQ_msg (result_message,
5234 result_message->request_id = msg->request_id;
5235 result_message->result_status = htons (GNUNET_SETU_STATUS_FAILURE);
5236 GNUNET_MQ_send (set->cs->mq, ev);
5238 return;
5239 }
5241 "Client accepting request %u\n",
5242 (uint32_t) ntohl (msg->accept_reject_id));
5243 listener = op->listener;
5244 op->listener = NULL;
5246 listener->op_tail,
5247 op);
5248 op->set = set;
5250 set->ops_tail,
5251 op);
5252 op->client_request_id = ntohl (msg->request_id);
5253 op->byzantine = msg->byzantine;
5254 op->byzantine_lower_bound = ntohl (msg->byzantine_lower_bound);
5255 op->force_full = msg->force_full;
5256 op->force_delta = msg->force_delta;
5257 op->symmetric = msg->symmetric;
5258 op->rtt_bandwidth_tradeoff = msg->bandwidth_latency_tradeoff;
5259 op->ibf_bucket_number_factor = msg->ibf_bucket_number_factor;
5260 op->ibf_number_buckets_per_element = msg->ibf_number_of_buckets_per_element;
5261 op->byzantine_upper_bound = msg->byzantine_upper_bond;
5262 op->active_passive_switch_required = false;
5263 /* create hashmap for message control */
5264 op->message_control_flow = GNUNET_CONTAINER_multihashmap_create (32,
5265 GNUNET_NO);
5266 op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create (32,GNUNET_NO);
5267
5268#if MEASURE_PERFORMANCE
5269 /* load config */
5270 load_config (op);
5271#endif
5272
5273 /* Advance generation values, so that future mutations do not
5274 interfer with the running operation. */
5275 op->generation_created = set->current_generation;
5276 advance_generation (set);
5277 GNUNET_assert (NULL == op->se);
5278
5280 "accepting set union operation\n");
5282 "# of accepted union operations",
5283 1,
5284 GNUNET_NO);
5286 "# of total union operations",
5287 1,
5288 GNUNET_NO);
5289 {
5290 struct MultiStrataEstimator *se;
5291 struct GNUNET_MQ_Envelope *ev;
5292 struct StrataEstimatorMessage *strata_msg;
5293 char *buf;
5294 size_t len;
5295 uint16_t type;
5296
5297 op->se = strata_estimator_dup (op->set->se);
5298 op->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32,
5299 GNUNET_NO);
5300 op->salt_receive = (op->peer_site + 1) % 2;
5301 op->salt_send = op->peer_site; // FIXME?????
5304 op->key_to_element);
5305
5306 /* kick off the operation */
5307 se = op->se;
5308
5309 uint8_t se_count = 1;
5310 if (op->initial_size > 0)
5311 {
5312 op->total_elements_size_local = 0;
5313 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
5314 &
5316 op);
5318 op->total_elements_size_local / op->initial_size,
5319 op->initial_size);
5320 }
5322 * ((SE_IBFS_TOTAL_SIZE / 8) * se_count));
5323 len = strata_estimator_write (se,
5325 se_count,
5326 buf);
5327#if MEASURE_PERFORMANCE
5328 perf_store.se.sent += 1;
5329 perf_store.se.sent_var_bytes += len;
5330#endif
5331
5332 if (len < se->stratas[0]->strata_count * IBF_BUCKET_SIZE
5335 else
5337 ev = GNUNET_MQ_msg_extra (strata_msg,
5338 len,
5339 type);
5340 GNUNET_memcpy (&strata_msg[1],
5341 buf,
5342 len);
5343 GNUNET_free (buf);
5344 strata_msg->set_size
5346 op->set->content->elements));
5347 strata_msg->se_count = se_count;
5348 GNUNET_MQ_send (op->mq,
5349 ev);
5350 op->phase = PHASE_EXPECT_IBF;
5351 }
5352 /* Now allow CADET to continue, as we did not do this in
5353 #handle_incoming_msg (as we wanted to first see if the
5354 local client would accept the request). */
5355 GNUNET_CADET_receive_done (op->channel);
5357}
5358
5359
5365static void
5366shutdown_task (void *cls)
5367{
5368 /* Delay actual shutdown to allow service to disconnect clients */
5370 if (0 == num_clients)
5371 {
5372 if (NULL != cadet)
5373 {
5375 cadet = NULL;
5376 }
5377 }
5379 GNUNET_YES);
5381 "handled shutdown request\n");
5382#if MEASURE_PERFORMANCE
5383 calculate_perf_store ();
5384#endif
5385}
5386
5387
5396static void
5397run (void *cls,
5398 const struct GNUNET_CONFIGURATION_Handle *cfg,
5400{
5401 /* FIXME: need to modify SERVICE (!) API to allow
5402 us to run a shutdown task *after* clients were
5403 forcefully disconnected! */
5405 NULL);
5407 cfg);
5409 if (NULL == cadet)
5410 {
5412 _ ("Could not connect to CADET service\n"));
5414 return;
5415 }
5416}
5417
5418
5423 "set",
5425 &run,
5428 NULL,
5429 GNUNET_MQ_hd_fixed_size (client_accept,
5432 NULL),
5433 GNUNET_MQ_hd_var_size (client_set_add,
5436 NULL),
5437 GNUNET_MQ_hd_fixed_size (client_create_set,
5440 NULL),
5441 GNUNET_MQ_hd_var_size (client_evaluate,
5444 NULL),
5445 GNUNET_MQ_hd_fixed_size (client_listen,
5448 NULL),
5449 GNUNET_MQ_hd_fixed_size (client_reject,
5452 NULL),
5453 GNUNET_MQ_hd_fixed_size (client_cancel,
5456 NULL),
5458
5459
5460/* end of gnunet-service-setu.c */
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
void ibf_subtract(struct InvertibleBloomFilter *ibf1, const struct InvertibleBloomFilter *ibf2)
Subtract ibf2 from ibf1, storing the result in ibf1.
Definition: ibf.c:357
int ibf_decode(struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key *ret_id)
Decode and remove an element from the IBF, if possible.
Definition: ibf.c:229
void ibf_read_slice(const void *buf, uint32_t start, uint32_t count, struct InvertibleBloomFilter *ibf)
Read buckets from a buffer into an ibf.
Definition: ibf.c:324
void ibf_write_slice(const struct InvertibleBloomFilter *ibf, uint32_t start, uint32_t count, void *buf)
Write buckets from an ibf to a buffer.
Definition: ibf.c:291
void ibf_insert(struct InvertibleBloomFilter *ibf, struct IBF_Key key)
Insert a key into an IBF.
Definition: ibf.c:168
struct InvertibleBloomFilter * ibf_dup(const struct InvertibleBloomFilter *ibf)
Create a copy of an IBF, the copy has to be destroyed properly.
Definition: ibf.c:380
struct InvertibleBloomFilter * ibf_create(uint32_t size, uint8_t hash_num)
Create an invertible bloom filter.
Definition: ibf.c:80
void ibf_destroy(struct InvertibleBloomFilter *ibf)
Destroy all resources associated with the invertible bloom filter.
Definition: ibf.c:404
#define IBF_BUCKET_SIZE
Size of one ibf bucket in bytes.
Definition: ibf.h:72
char * getenv()
static struct GNUNET_ARM_Operation * op
Current operation.
Definition: gnunet-arm.c:144
static int ret
Final status code.
Definition: gnunet-arm.c:94
static struct GNUNET_CONFIGURATION_Handle * cfg
Our configuration.
Definition: gnunet-arm.c:109
static unsigned int phase
Processing stage that we are in.
Definition: gnunet-arm.c:114
static struct GNUNET_CADET_Handle * mh
Cadet handle.
Definition: gnunet-cadet.c:92
struct GNUNET_HashCode key
The key used in the DHT.
static struct GNUNET_FS_Handle * ctx
static GstElement * source
Appsrc instance into which we write data for the pipeline.
static pa_context * context
Pulseaudio context.
static char * name
Name (label) of the records to list.
static struct GNUNET_IDENTITY_EgoLookup * el
Handle to identity lookup.
static char * res
Currently read line or NULL on EOF.
static char * value
Value of the record to add/remove.
static uint32_t type
Type string converted to DNS type value.
static int status
The program status; 0 for success.
Definition: gnunet-nse.c:38
static struct GNUNET_CRYPTO_PowSalt salt
Salt for PoW calcualations.
Definition: gnunet-scrypt.c:34
static struct GNUNET_SERVICE_Handle * service
Handle to our service instance.
UnionOperationPhase
Current phase we are in for a union operation.
unsigned int strata_estimator_difference(const struct StrataEstimator *se1, const struct StrataEstimator *se2)
Estimate set difference with two strata estimators, i.e.
struct StrataEstimator * strata_estimator_create(unsigned int strata_count, uint32_t ibf_size, uint8_t ibf_hashnum)
Create a new strata estimator with the given parameters.
void strata_estimator_destroy(struct StrataEstimator *se)
Destroy a strata estimator, free all of its resources.
int strata_estimator_read(const void *buf, size_t buf_len, int is_compressed, struct StrataEstimator *se)
Read strata from the buffer into the given strata estimator.
size_t strata_estimator_write(const struct StrataEstimator *se, void *buf)
Write the given strata estimator to the buffer.
void strata_estimator_insert(struct StrataEstimator *se, struct IBF_Key key)
Add a key to the strata estimator.
struct StrataEstimator * strata_estimator_dup(struct StrataEstimator *se)
Make a copy of a strata estimator.
static void _GSS_operation_destroy(struct Operation *op)
Destroy the given operation.
#define IBF_MIN_SIZE
Minimal size of an ibf Based on the bsc thesis of Elias Summermatter (2021)
static void _GSS_operation_destroy2(struct Operation *op)
This function probably should not exist and be replaced by inlining more specific logic in the variou...
static void handle_client_create_set(void *cls, const struct GNUNET_SETU_CreateMessage *msg)
Called when a client wants to create a new set.
static int _GSS_is_element_of_operation(struct ElementEntry *ee, struct Operation *op)
Is element ee part of the set used by op?
static int check_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
static unsigned int get_next_ibf_size(float ibf_bucket_number_factor, unsigned int decoded_elements, unsigned int last_ibf_size)
#define MAX_IBF_SIZE
The maximum size of an ibf we use is MAX_IBF_SIZE=2^20.
#define MAX_BUCKETS_PER_MESSAGE
Number of buckets that can be transmitted in one message.
static uint32_t suggest_id
Counter for allocating unique IDs for clients, used to identify incoming operation requests from remo...
static void handle_union_p2p_request_full(void *cls, const struct TransmitFullMessage *msg)
static int check_union_p2p_request_full(void *cls, const struct TransmitFullMessage *mh)
Handle a request for full set transmission.
static int create_randomized_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Create randomized element hashmap for full sending.
static int decode_and_send(struct Operation *op)
Decode which elements are missing on each side, and send the appropriate offers and inquiries.
static int send_full_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Send a set element.
MESSAGE_CONTROL_FLOW_STATE
Different states to control the messages flow in differential mode.
@ MSG_CFS_EXPECTED
Track that receiving this message is expected.
@ MSG_CFS_SENT
Track that a message has been sent.
@ MSG_CFS_UNINITIALIZED
Initial message state.
@ MSG_CFS_RECEIVED
Track that message has been received.
static struct Listener * listener_head
Listeners are held in a doubly linked list.
static void send_full_set(struct Operation *op)
Switch to full set transmission for op.
#define PROBABILITY_FOR_NEW_ROUND
Is the estimated probability for a new round this values is based on the bsc thesis of Elias Summerma...
static int check_byzantine_bounds(struct Operation *op)
Check if all given byzantine parameters are in given boundaries.
static void handle_client_accept(void *cls, const struct GNUNET_SETU_AcceptMessage *msg)
Handle a request from the client to accept a set operation that came from a remote peer.
static void handle_union_p2p_over(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a over message from a remote peer.
static void handle_client_cancel(void *cls, const struct GNUNET_SETU_CancelMessage *msg)
Handle a request from the client to cancel a running set operation.
static void handle_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
static void handle_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
static void handle_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
static int check_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Check an IBF message from a remote peer.
static struct IBF_Key get_ibf_key(const struct GNUNET_HashCode *src)
Derive the IBF key from a hash code and a salt.
static struct GNUNET_CADET_Handle * cadet
Handle to the cadet service, used to listen for and connect to remote peers.
static void maybe_finish(struct Operation *op)
Tests if the operation is finished, and if so notify.
static void handle_client_reject(void *cls, const struct GNUNET_SETU_RejectMessage *msg)
Called when the listening client rejects an operation request by another peer.
static int init_key_to_element_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for initializing the key-to-element mapping of a union operation.
static int send_offers_iterator(void *cls, uint32_t key, void *value)
Iterator to send elements to a remote peer.
static void handle_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
static int check_union_p2p_inquiry(void *cls, const struct InquiryMessage *msg)
Send offers (for GNUNET_Hash-es) in response to inquiries (for IBF_Key-s).
static int check_union_p2p_strata_estimator(void *cls, const struct StrataEstimatorMessage *msg)
Handle a strata estimator from a remote peer.
static int send_missing_full_elements_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries.
static unsigned int get_size_from_difference(unsigned int diff, int number_buckets_per_element, float ibf_bucket_number_factor)
Compute the necessary order of an ibf from the size of the symmetric set difference.
static void handle_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Handle a request for a set operation from another peer.
static int in_shutdown
Are we in shutdown? if GNUNET_YES and the number of clients drops to zero, disconnect from CADET.
static int destroy_elements_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator over hash map entries to free element entries.
void send_offers_for_key(struct Operation *op, struct IBF_Key ibf_key)
Send offers (in the form of GNUNET_Hash-es) to the remote peer for the given IBF key.
static void shutdown_task(void *cls)
Called to clean up, after a shutdown has been requested.
static void handle_union_p2p_send_full(void *cls, const struct TransmitFullMessage *msg)
Handle send full message received from other peer.
static void handle_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
static struct GNUNET_STATISTICS_Handle * _GSS_statistics
Statistics handle.
static unsigned int num_clients
Number of active clients.
static void channel_window_cb(void *cls, const struct GNUNET_CADET_Channel *channel, int window_size)
Function called whenever an MQ-channel's transmission window size changes.
static void advance_generation(struct Set *set)
Advance the current generation of a set, adding exclusion ranges if necessary.
static int update_message_control_flow(struct GNUNET_CONTAINER_MultiHashMap *hash_map, enum MESSAGE_CONTROL_FLOW_STATE new_mcfs, const struct GNUNET_HashCode *hash_code, enum MESSAGE_TYPE mt)
Function to update, track and validate message received in differential sync.
static void check_max_differential_rounds(struct Operation *op)
Limit active passive switches in differential sync to configured security level.
static void handle_union_p2p_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a done message from a remote peer.
static int check_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Check a request for a set operation from another peer.
static void fail_union_operation(struct Operation *op)
Inform the client that the union operation has failed, and proceed to destroy the evaluate operation.
static int check_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Check offer (of struct GNUNET_HashCodes).
static void send_client_done(void *cls)
Signal to the client that the operation has finished and destroy the operation.
static void unsalt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
Reverse modification done in the salt_key function.
static void handle_union_p2p_full_done(void *cls, const struct GNUNET_MessageHeader *mh)
Handle a "full done" message.
static void salt_key(const struct IBF_Key *k_in, uint32_t salt, struct IBF_Key *k_out)
Modify an IBF key k_in based on the salt, returning a salted key in k_out.
static int prepare_ibf(struct Operation *op, uint32_t size)
Create an ibf with the operation's elements of the specified size.
static void op_register_element(struct Operation *op, struct ElementEntry *ee, int received)
Insert an element into the union operation's key-to-element mapping.
static void incoming_destroy(struct Operation *op)
Destroy an incoming request from a remote peer.
static struct Listener * listener_tail
Listeners are held in a doubly linked list.
static int determinate_avg_element_size_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for determining average size.
static int check_union_p2p_full_element(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check a full element message from a remote peer.
MESSAGE_TYPE
Message types to track in message control flow.
@ DEMAND_MESSAGE
Demand message type.
@ OFFER_MESSAGE
Offer message type.
@ ELEMENT_MESSAGE
Element message type.
static enum GNUNET_GenericReturnValue check_valid_phase(const uint8_t allowed_phases[], size_t size_phases, struct Operation *op)
Validates the if a message is received in a correct phase.
GNUNET_SERVICE_MAIN("set", GNUNET_SERVICE_OPTION_NONE, &run, &client_connect_cb, &client_disconnect_cb, NULL, GNUNET_MQ_hd_fixed_size(client_accept, GNUNET_MESSAGE_TYPE_SETU_ACCEPT, struct GNUNET_SETU_AcceptMessage, NULL), GNUNET_MQ_hd_var_size(client_set_add, GNUNET_MESSAGE_TYPE_SETU_ADD, struct GNUNET_SETU_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size(client_create_set, GNUNET_MESSAGE_TYPE_SETU_CREATE, struct GNUNET_SETU_CreateMessage, NULL), GNUNET_MQ_hd_var_size(client_evaluate, GNUNET_MESSAGE_TYPE_SETU_EVALUATE, struct GNUNET_SETU_EvaluateMessage, NULL), GNUNET_MQ_hd_fixed_size(client_listen, GNUNET_MESSAGE_TYPE_SETU_LISTEN, struct GNUNET_SETU_ListenMessage, NULL), GNUNET_MQ_hd_fixed_size(client_reject, GNUNET_MESSAGE_TYPE_SETU_REJECT, struct GNUNET_SETU_RejectMessage, NULL), GNUNET_MQ_hd_fixed_size(client_cancel, GNUNET_MESSAGE_TYPE_SETU_CANCEL, struct GNUNET_SETU_CancelMessage, NULL), GNUNET_MQ_handler_end())
Define "main" method using service macro.
static void handle_union_p2p_offer(void *cls, const struct GNUNET_MessageHeader *mh)
Handle offers (of struct GNUNET_HashCodes) and respond with demands (of struct GNUNET_HashCodes).
static void initialize_key_to_element(struct Operation *op)
Initialize the IBF key to element mapping local to this set operation.
static int check_union_p2p_demand(void *cls, const struct GNUNET_MessageHeader *mh)
Check a demand by the other peer for elements based on a list of struct GNUNET_HashCodes.
static enum GNUNET_GenericReturnValue free_values_iter(void *cls, const struct GNUNET_HashCode *key, void *value)
static int destroy_key_to_element_iter(void *cls, uint32_t key, void *value)
Iterator over hash map entries, called to destroy the linked list of colliding ibf key entries.
static void handle_union_p2p_ibf(void *cls, const struct IBFMessage *msg)
Handle an IBF message from a remote peer.
static void * channel_new_cb(void *cls, struct GNUNET_CADET_Channel *channel, const struct GNUNET_PeerIdentity *source)
Method called whenever another peer has added us to a channel the other peer initiated.
static int check_union_p2p_send_full(void *cls, const struct TransmitFullMessage *msg)
Check send full message received from other peer.
static void handle_client_listen(void *cls, const struct GNUNET_SETU_ListenMessage *msg)
Called when a client wants to create a new listener.
#define SE_STRATA_COUNT
Number of IBFs in a strata estimator.
static struct Operation * get_incoming(uint32_t id)
Get the incoming socket associated with the given id.
static void full_sync_plausibility_check(struct Operation *op)
Function that checks if full sync is plausible.
static void * client_connect_cb(void *cls, struct GNUNET_SERVICE_Client *c, struct GNUNET_MQ_Handle *mq)
Callback called when a client connects to the service.
static void handle_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Handle an element message from a remote peer.
#define INCOMING_CHANNEL_TIMEOUT
How long do we hold on to an incoming channel if there is no local listener before giving up?
static void handle_client_set_add(void *cls, const struct GNUNET_SETU_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
#define LOG(kind,...)
static int is_message_in_message_control_flow(struct GNUNET_CONTAINER_MultiHashMap *hash_map, struct GNUNET_HashCode *hash_code, enum MESSAGE_TYPE mt)
Validate if a message in differential sync si already received before.
#define SECURITY_LEVEL
Security level used for byzantine checks (2^80)
static int send_ibf(struct Operation *op, uint32_t ibf_size)
Send an ibf of appropriate size.
MODE_OF_OPERATION
Different modes of operations.
@ DIFFERENTIAL_SYNC
Mode just synchronizes the difference between sets.
@ FULL_SYNC_LOCAL_SENDING_FIRST
Mode send full set sending local set first.
@ FULL_SYNC_REMOTE_SENDING_FIRST
Mode request full set from remote peer.
static struct KeyEntry * op_get_element(struct Operation *op, const struct GNUNET_HashCode *element_hash)
Determine whether the given element is already in the operation's element set.
static void client_disconnect_cb(void *cls, struct GNUNET_SERVICE_Client *client, void *internal_cls)
Clean up after a client has disconnected.
@ PHASE_FINISH_CLOSING
The protocol is almost finished, but we still have to flush our message queue and/or expect some elem...
@ PHASE_EXPECT_SE
We sent the request message, and expect a strata estimator.
@ PHASE_EXPECT_IBF_LAST
Continuation for multi part IBFs.
@ PHASE_FULL_RECEIVING
Phase that receives full set first and then sends elements that are the local peer missing.
@ PHASE_FINISH_WAITING
In the penultimate phase, we wait until all our demands are satisfied.
@ PHASE_FINISHED
In the ultimate phase, we wait until our demands are satisfied and then quit (sending another DONE me...
@ PHASE_PASSIVE_DECODING
The other peer is decoding the IBF we just sent.
@ PHASE_ACTIVE_DECODING
We are decoding an IBF.
@ PHASE_FULL_SENDING
After sending the full set, wait for responses with the elements that the local peer is missing.
@ PHASE_EXPECT_IBF
We sent the strata estimator, and expect an IBF.
static uint8_t estimate_best_mode_of_operation(uint64_t avg_element_size, uint64_t local_set_size, uint64_t remote_set_size, uint64_t est_set_diff_remote, uint64_t est_set_diff_local, uint64_t bandwith_latency_tradeoff, uint64_t ibf_bucket_number_factor)
Function that chooses the optimal mode of operation depending on operation parameters.
static void channel_end_cb(void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
Function called whenever a channel is destroyed.
static void send_client_element(struct Operation *op, const struct GNUNET_SETU_Element *element, enum GNUNET_SETU_Status status)
Send a result message to the client indicating that there is a new element.
static int determinate_done_message_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator for determining if all demands have been satisfied.
#define SE_IBF_HASH_NUM
The hash num parameter for the difference digests and strata estimators.
static void incoming_timeout_cb(void *cls)
Timeout happens iff:
#define SE_IBFS_TOTAL_SIZE
Primes for all 4 different strata estimators 61,67,71,73,79,83,89,97 348 Based on the bsc thesis of E...
#define DIFFERENTIAL_RTT_MEAN
AVG RTT for differential sync when k=2 and Factor = 2 Based on the bsc thesis of Elias Summermatter (...
static int op_get_element_iterator(void *cls, uint32_t key, void *value)
Iterator over the mapping from IBF keys to element entries.
static int prepare_ibf_iterator(void *cls, uint32_t key, void *value)
Insert a key into an ibf.
static int check_union_p2p_elements(void *cls, const struct GNUNET_SETU_ElementMessage *emsg)
Check an element message from a remote peer.
static void run(void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_SERVICE_Handle *service)
Function called by the service's run method to run service-specific setup code.
static int check_client_evaluate(void *cls, const struct GNUNET_SETU_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
uint8_t determine_strata_count(uint64_t avg_element_size, uint64_t element_count)
Calculates the optimal number of strata Estimators to send.
static unsigned int ibf_size
Constants for network applications operating on top of the CADET service.
CADET service; establish channels to distant peers.
Constants for network protocols.
Two-peer set union operations.
API to create, modify and access statistics.
struct GNUNET_CADET_Handle * GNUNET_CADET_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the MQ-based cadet service.
Definition: cadet_api.c:894
void GNUNET_CADET_receive_done(struct GNUNET_CADET_Channel *channel)
Indicate readiness to receive the next message on a channel.
Definition: cadet_api.c:872
void GNUNET_CADET_channel_destroy(struct GNUNET_CADET_Channel *channel)
Destroy an existing channel.
Definition: cadet_api.c:830
struct GNUNET_MQ_Handle * GNUNET_CADET_get_mq(const struct GNUNET_CADET_Channel *channel)
Obtain the message queue for a connected channel.
Definition: cadet_api.c:1066
struct GNUNET_CADET_Port * GNUNET_CADET_open_port(struct GNUNET_CADET_Handle *h, const struct GNUNET_HashCode *port, GNUNET_CADET_ConnectEventHandler connects, void *connects_cls, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Open a port to receive incoming MQ-based channels.
Definition: cadet_api.c:954
void GNUNET_CADET_disconnect(struct GNUNET_CADET_Handle *handle)
Disconnect from the cadet service.
Definition: cadet_api.c:774
void GNUNET_CADET_close_port(struct GNUNET_CADET_Port *p)
Close a port opened with GNUNET_CADET_open_port.
Definition: cadet_api.c:801
struct GNUNET_CADET_Channel * GNUNET_CADET_channel_create(struct GNUNET_CADET_Handle *h, void *channel_cls, const struct GNUNET_PeerIdentity *destination, const struct GNUNET_HashCode *port, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Create a new channel towards a remote peer.
Definition: cadet_api.c:1015
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_get_value_number(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, unsigned long long *number)
Get a configuration value that should be a number.
struct GNUNET_CONFIGURATION_Handle * GNUNET_CONFIGURATION_create(void)
Create a new configuration object.
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_load(struct GNUNET_CONFIGURATION_Handle *cfg, const char *filename)
Load configuration.
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_get_value_float(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, float *number)
Get a configuration value that should be a floating point number.
uint64_t GNUNET_CRYPTO_random_u64(enum GNUNET_CRYPTO_Quality mode, uint64_t max)
Generate a random unsigned 64-bit value.
uint32_t GNUNET_CRYPTO_random_u32(enum GNUNET_CRYPTO_Quality mode, uint32_t i)
Produce a random value.
@ GNUNET_CRYPTO_QUALITY_NONCE
Randomness for IVs etc.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
int GNUNET_CRYPTO_hash_cmp(const struct GNUNET_HashCode *h1, const struct GNUNET_HashCode *h2)
Compare function for HashCodes, producing a total ordering of all hashcodes.
Definition: crypto_hash.c:221
enum GNUNET_GenericReturnValue GNUNET_CRYPTO_kdf(void *result, size_t out_len, const void *xts, size_t xts_len, const void *skm, size_t skm_len,...)
Derive key.
Definition: crypto_kdf.c:70
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_contains(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Check if the map contains any value under the given key (including values that are NULL).
int GNUNET_CONTAINER_multihashmap32_get_multiple(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, GNUNET_CONTAINER_MultiHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map that match a particular key.
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MultiHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap32_put(struct GNUNET_CONTAINER_MultiHashMap32 *map, uint32_t key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
struct GNUNET_CONTAINER_MultiHashMap32 * GNUNET_CONTAINER_multihashmap32_create(unsigned int len)
Create a 32-bit key multi hash map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_remove(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, const void *value)
Remove the given key-value pair from the map.
unsigned int GNUNET_CONTAINER_multihashmap32_size(const struct GNUNET_CONTAINER_MultiHashMap32 *map)
Get the number of key-value pairs in the map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
unsigned int GNUNET_CONTAINER_multihashmap_size(const struct GNUNET_CONTAINER_MultiHashMap *map)
Get the number of key-value pairs in the map.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
void GNUNET_CONTAINER_multihashmap32_destroy(struct GNUNET_CONTAINER_MultiHashMap32 *map)
Destroy a 32-bit key hash map.
int GNUNET_CONTAINER_multihashmap32_iterate(struct GNUNET_CONTAINER_MultiHashMap32 *map, GNUNET_CONTAINER_MultiHashMapIterator32Callback it, void *it_cls)
Iterate over all entries in the map.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST
, ' bother checking if a value already exists (faster than GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE...
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE
Allow multiple values with the same key.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY
There must only be one value per key; storing a value should fail if a value under the same key alrea...
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
If a value with the given key exists, replace it.
#define GNUNET_log(kind,...)
void GNUNET_CRYPTO_hash_context_read(struct GNUNET_HashContext *hc, const void *buf, size_t size)
Add data to be hashed.
Definition: crypto_hash.c:366
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:54
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:37
void GNUNET_CRYPTO_hash_context_finish(struct GNUNET_HashContext *hc, struct GNUNET_HashCode *r_hash)
Finish the hash computation.
Definition: crypto_hash.c:390
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
GNUNET_GenericReturnValue
Named constants for return values.
struct GNUNET_HashContext * GNUNET_CRYPTO_hash_context_start(void)
Start incremental hashing operation.
Definition: crypto_hash.c:350
@ GNUNET_OK
@ GNUNET_YES
@ GNUNET_NO
@ GNUNET_SYSERR
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur.
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_DEBUG
@ GNUNET_ERROR_TYPE_INFO
struct GNUNET_MessageHeader * GNUNET_copy_message(const struct GNUNET_MessageHeader *msg)
Create a copy of the given message.