GNUnet 0.22.0
gnunet-service-seti.c
Go to the documentation of this file.
1/*
2 This file is part of GNUnet
3 Copyright (C) 2013-2017, 2020 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
26#include "platform.h"
30#include "gnunet_seti_service.h"
31#include "gnunet_block_lib.h"
32#include "seti.h"
33
38#define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
39
40
45{
50
56
63
69
75
81};
82
83
87struct Set;
88
95struct ElementEntry;
96
100struct Operation;
101
102
109struct ElementEntry
110{
116
122
126 unsigned int generation_added;
127
132 int remote;
133};
134
135
140struct Listener;
141
142
146struct ClientState
147{
151 struct Set *set;
152
156 struct Listener *listener;
157
162
166 struct GNUNET_MQ_Handle *mq;
167};
168
169
173struct Operation
174{
180
187
194
198 struct Operation *next;
199
203 struct Operation *prev;
204
209
213 struct Listener *listener;
214
218 struct GNUNET_MQ_Handle *mq;
219
224
229 struct Set *set;
230
235
240
246
251
256 char *bf_data;
257
263
268
274
278 uint32_t bf_data_size;
279
284
289 uint32_t salt;
290
295
299 unsigned int generation_created;
300
305
312
316 uint32_t remote_element_count;
317
321 uint32_t client_request_id;
322
327
333 uint32_t suggest_id;
334
335};
336
337
342struct SetContent
343{
348
352 unsigned int refcount;
353
357 unsigned int latest_generation;
358
362 int iterator_count;
363};
364
365
369struct Set
370{
374 struct Set *next;
375
379 struct Set *prev;
380
385 struct ClientState *cs;
386
391 struct SetContent *content;
392
398
402 struct Operation *ops_head;
403
407 struct Operation *ops_tail;
408
413 unsigned int current_generation;
414
415};
416
417
422struct Listener
423{
427 struct Listener *next;
428
432 struct Listener *prev;
433
439 struct Operation *op_head;
440
446 struct Operation *op_tail;
447
452 struct ClientState *cs;
453
458
463 struct GNUNET_HashCode app_id;
464
465};
466
467
473
478
482static struct Listener *listener_head;
483
487static struct Listener *listener_tail;
488
492static unsigned int num_clients;
493
498static int in_shutdown;
499
506static uint32_t suggest_id;
507
508
516static void
518 struct GNUNET_SETI_Element *element)
519{
520 struct GNUNET_MQ_Envelope *ev;
521 struct GNUNET_SETI_ResultMessage *rm;
522
523 if (GNUNET_YES == op->return_intersection)
524 {
525 GNUNET_break (0);
526 return; /* Wrong mode for transmitting removed elements */
527 }
529 "Sending removed element (size %u) to client\n",
530 element->size);
532 "# Element removed messages sent",
533 1,
534 GNUNET_NO);
535 GNUNET_assert (0 != op->client_request_id);
536 ev = GNUNET_MQ_msg_extra (rm,
537 element->size,
539 if (NULL == ev)
540 {
541 GNUNET_break (0);
542 return;
543 }
545 rm->request_id = htonl (op->client_request_id);
546 rm->element_type = element->element_type;
547 GNUNET_memcpy (&rm[1],
548 element->data,
549 element->size);
550 GNUNET_MQ_send (op->set->cs->mq,
551 ev);
552}
553
554
562static int
564 struct Operation *op)
565{
566 return op->generation_created >= ee->generation_added;
567}
568
569
578static int
580 const struct GNUNET_HashCode *key,
581 void *value)
582{
583 struct Operation *op = cls;
584 struct ElementEntry *ee = value;
585 struct GNUNET_HashCode mutated_hash;
586
588 "FIMA called for %s:%u\n",
590 ee->element.size);
591
593 {
595 "Reduced initialization, not starting with %s:%u (wrong generation)\n",
597 ee->element.size);
598 return GNUNET_YES; /* element not valid in our operation's generation */
599 }
600
601 /* Test if element is in other peer's bloomfilter */
603 op->salt,
604 &mutated_hash);
606 "Testing mingled hash %s with salt %u\n",
607 GNUNET_h2s (&mutated_hash),
608 op->salt);
609 if (GNUNET_NO ==
611 &mutated_hash))
612 {
613 /* remove this element */
615 &ee->element);
617 "Reduced initialization, not starting with %s:%u\n",
619 ee->element.size);
620 return GNUNET_YES;
621 }
622 op->my_element_count++;
623 GNUNET_CRYPTO_hash_xor (&op->my_xor,
624 &ee->element_hash,
625 &op->my_xor);
627 "Filtered initialization of my_elements, adding %s:%u\n",
629 ee->element.size);
632 &ee->element_hash,
633 ee,
635
636 return GNUNET_YES;
637}
638
639
649static int
651 const struct GNUNET_HashCode *key,
652 void *value)
653{
654 struct Operation *op = cls;
655 struct ElementEntry *ee = value;
656 struct GNUNET_HashCode mutated_hash;
657
659 op->salt,
660 &mutated_hash);
662 "Testing mingled hash %s with salt %u\n",
663 GNUNET_h2s (&mutated_hash),
664 op->salt);
665 if (GNUNET_NO ==
667 &mutated_hash))
668 {
669 GNUNET_break (0 < op->my_element_count);
670 op->my_element_count--;
671 GNUNET_CRYPTO_hash_xor (&op->my_xor,
672 &ee->element_hash,
673 &op->my_xor);
675 "Bloom filter reduction of my_elements, removing %s:%u\n",
677 ee->element.size);
680 &ee->element_hash,
681 ee));
683 &ee->element);
684 }
685 else
686 {
688 "Bloom filter reduction of my_elements, keeping %s:%u\n",
690 ee->element.size);
691 }
692 return GNUNET_YES;
693}
694
695
704static int
706 const struct GNUNET_HashCode *key,
707 void *value)
708{
709 struct Operation *op = cls;
710 struct ElementEntry *ee = value;
711 struct GNUNET_HashCode mutated_hash;
712
714 op->salt,
715 &mutated_hash);
717 "Initializing BF with hash %s with salt %u\n",
718 GNUNET_h2s (&mutated_hash),
719 op->salt);
721 &mutated_hash);
722 return GNUNET_YES;
723}
724
725
738static void
740{
741 struct Set *set = op->set;
742 struct GNUNET_CADET_Channel *channel;
743
744 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op);
745 GNUNET_assert (NULL == op->listener);
746 if (NULL != op->remote_bf)
747 {
749 op->remote_bf = NULL;
750 }
751 if (NULL != op->local_bf)
752 {
754 op->local_bf = NULL;
755 }
756 if (NULL != op->my_elements)
757 {
759 op->my_elements = NULL;
760 }
761 if (NULL != op->full_result_iter)
762 {
764 op->full_result_iter);
765 op->full_result_iter = NULL;
766 }
768 "Destroying intersection op state done\n");
769 if (NULL != set)
770 {
772 set->ops_tail,
773 op);
774 op->set = NULL;
775 }
776 if (NULL != op->context_msg)
777 {
778 GNUNET_free (op->context_msg);
779 op->context_msg = NULL;
780 }
781 if (NULL != (channel = op->channel))
782 {
783 /* This will free op; called conditionally as this helper function
784 is also called from within the channel disconnect handler. */
785 op->channel = NULL;
787 }
788 /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
789 * there was a channel end handler that will free 'op' on the call stack. */
790}
791
792
798static void
800
801
807static void
809{
810 struct Listener *listener;
811
813 "Destroying incoming operation %p\n",
814 op);
815 if (NULL != (listener = op->listener))
816 {
818 listener->op_tail,
819 op);
820 op->listener = NULL;
821 }
822 if (NULL != op->timeout_task)
823 {
824 GNUNET_SCHEDULER_cancel (op->timeout_task);
825 op->timeout_task = NULL;
826 }
828}
829
830
837static void
839{
840 struct Operation *op = cls;
841 struct GNUNET_MQ_Envelope *ev;
842 struct GNUNET_SETI_ResultMessage *rm;
843
845 "Intersection succeeded, sending DONE to local client\n");
847 "# Intersection operations succeeded",
848 1,
849 GNUNET_NO);
850 ev = GNUNET_MQ_msg (rm,
852 rm->request_id = htonl (op->client_request_id);
854 rm->element_type = htons (0);
855 GNUNET_MQ_send (op->set->cs->mq,
856 ev);
858}
859
860
866static void
868{
869 struct GNUNET_CADET_Channel *channel;
870
872 "channel_end_cb called\n");
873 if (NULL != (channel = op->channel))
874 {
875 /* This will free op; called conditionally as this helper function
876 is also called from within the channel disconnect handler. */
877 op->channel = NULL;
879 }
880 if (NULL != op->listener)
881 {
883 return;
884 }
885 if (NULL != op->set)
886 {
887 if (GNUNET_YES == op->channel_death_expected)
888 {
889 /* oh goodie, we are done! */
891 }
892 else
893 {
894 /* sorry, channel went down early, too bad. */
896 }
897 }
898 else
900 GNUNET_free (op);
901}
902
903
910static void
912{
913 struct GNUNET_MQ_Envelope *ev;
915
917 "Intersection operation failed\n");
919 "# Intersection operations failed",
920 1,
921 GNUNET_NO);
922 if (NULL != op->my_elements)
923 {
925 op->my_elements = NULL;
926 }
927 ev = GNUNET_MQ_msg (msg,
929 msg->result_status = htons (GNUNET_SETI_STATUS_FAILURE);
930 msg->request_id = htonl (op->client_request_id);
931 msg->element_type = htons (0);
932 GNUNET_MQ_send (op->set->cs->mq,
933 ev);
935}
936
937
944static void
946{
947 struct GNUNET_MQ_Envelope *ev;
948 struct BFMessage *msg;
949 uint32_t bf_size;
950 uint32_t bf_elementbits;
951 uint32_t chunk_size;
952 char *bf_data;
953 uint32_t offset;
954
955 /* We consider the ratio of the set sizes to determine
956 the number of bits per element, as the smaller set
957 should use more bits to maximize its set reduction
958 potential and minimize overall bandwidth consumption. */
959 bf_elementbits = 2 + ceil (log2 ((double)
960 (op->remote_element_count
961 / (double) op->my_element_count)));
962 if (bf_elementbits < 1)
963 bf_elementbits = 1; /* make sure k is not 0 */
964 /* optimize BF-size to ~50% of bits set */
965 bf_size = ceil ((double) (op->my_element_count
966 * bf_elementbits / log (2)));
968 "Sending Bloom filter (%u) of size %u bytes\n",
969 (unsigned int) bf_elementbits,
970 (unsigned int) bf_size);
971 op->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
972 bf_size,
973 bf_elementbits);
975 UINT32_MAX);
978 op);
979
980 /* send our Bloom filter */
982 "# Intersection Bloom filters sent",
983 1,
984 GNUNET_NO);
985 chunk_size = 60 * 1024 - sizeof(struct BFMessage);
986 if (bf_size <= chunk_size)
987 {
988 /* singlepart */
989 chunk_size = bf_size;
991 chunk_size,
995 op->local_bf,
996 (char *) &msg[1],
997 bf_size));
998 msg->sender_element_count = htonl (op->my_element_count);
999 msg->bloomfilter_total_length = htonl (bf_size);
1000 msg->bits_per_element = htonl (bf_elementbits);
1001 msg->sender_mutator = htonl (op->salt);
1002 msg->element_xor_hash = op->my_xor;
1003 GNUNET_MQ_send (op->mq, ev);
1004 }
1005 else
1006 {
1007 /* multipart */
1008 bf_data = GNUNET_malloc (bf_size);
1011 op->local_bf,
1012 bf_data,
1013 bf_size));
1014 offset = 0;
1015 while (offset < bf_size)
1016 {
1017 if (bf_size - chunk_size < offset)
1018 chunk_size = bf_size - offset;
1020 chunk_size,
1022 GNUNET_memcpy (&msg[1],
1023 &bf_data[offset],
1024 chunk_size);
1025 offset += chunk_size;
1026 msg->sender_element_count = htonl (op->my_element_count);
1027 msg->bloomfilter_total_length = htonl (bf_size);
1028 msg->bits_per_element = htonl (bf_elementbits);
1029 msg->sender_mutator = htonl (op->salt);
1030 msg->element_xor_hash = op->my_xor;
1031 GNUNET_MQ_send (op->mq, ev);
1032 }
1033 GNUNET_free (bf_data);
1034 }
1036 op->local_bf = NULL;
1037}
1038
1039
1048static void
1050{
1051 struct Operation *op = cls;
1052
1054 "DONE sent to other peer, now waiting for other end to close the channel\n");
1055 op->phase = PHASE_FINISHED;
1056 op->channel_death_expected = GNUNET_YES;
1057}
1058
1059
1067static void
1069{
1070 struct GNUNET_MQ_Envelope *ev;
1071 struct IntersectionDoneMessage *idm;
1072
1074 GNUNET_assert (GNUNET_NO == op->channel_death_expected);
1075 ev = GNUNET_MQ_msg (idm,
1077 idm->final_element_count = htonl (op->my_element_count);
1078 idm->element_xor_hash = op->my_xor;
1081 op);
1082 GNUNET_MQ_send (op->mq,
1083 ev);
1084}
1085
1086
1092static void
1094{
1095 struct Operation *op = cls;
1096 const void *nxt;
1097 const struct ElementEntry *ee;
1098 struct GNUNET_MQ_Envelope *ev;
1099 struct GNUNET_SETI_ResultMessage *rm;
1100 const struct GNUNET_SETI_Element *element;
1101 int res;
1102
1103 if (GNUNET_NO == op->return_intersection)
1104 {
1105 GNUNET_break (0);
1106 return; /* Wrong mode for transmitting removed elements */
1107 }
1109 op->full_result_iter,
1110 NULL,
1111 &nxt);
1112 if (GNUNET_NO == res)
1113 {
1115 "Sending done and destroy because iterator ran out\n");
1117 op->full_result_iter);
1118 op->full_result_iter = NULL;
1119 if (PHASE_DONE_RECEIVED == op->phase)
1120 {
1121 op->phase = PHASE_FINISHED;
1123 }
1124 else if (PHASE_MUST_SEND_DONE == op->phase)
1125 {
1126 send_p2p_done (op);
1127 }
1128 else
1129 {
1130 GNUNET_assert (0);
1131 }
1132 return;
1133 }
1134 ee = nxt;
1135 element = &ee->element;
1137 "Sending element %s:%u to client (full set)\n",
1138 GNUNET_h2s (&ee->element_hash),
1139 element->size);
1140 GNUNET_assert (0 != op->client_request_id);
1141 ev = GNUNET_MQ_msg_extra (rm,
1142 element->size,
1144 GNUNET_assert (NULL != ev);
1146 rm->request_id = htonl (op->client_request_id);
1147 rm->element_type = element->element_type;
1148 GNUNET_memcpy (&rm[1],
1149 element->data,
1150 element->size);
1153 op);
1154 GNUNET_MQ_send (op->set->cs->mq,
1155 ev);
1156}
1157
1158
1168static int
1170 const struct GNUNET_HashCode *key,
1171 void *value)
1172{
1173 struct ElementEntry *ee = value;
1174 struct Operation *op = cls;
1175
1177 return GNUNET_YES; /* element not live in operation's generation */
1178 GNUNET_CRYPTO_hash_xor (&op->my_xor,
1179 &ee->element_hash,
1180 &op->my_xor);
1182 "Initial full initialization of my_elements, adding %s:%u\n",
1183 GNUNET_h2s (&ee->element_hash),
1184 ee->element.size);
1187 &ee->element_hash,
1188 ee,
1190 return GNUNET_YES;
1191}
1192
1193
1200static void
1202{
1203 struct GNUNET_MQ_Envelope *ev;
1205
1207 "Sending our element count (%u)\n",
1208 op->my_element_count);
1209 ev = GNUNET_MQ_msg (msg,
1211 msg->sender_element_count = htonl (op->my_element_count);
1212 GNUNET_MQ_send (op->mq, ev);
1213}
1214
1215
1222static void
1224{
1225 op->phase = PHASE_BF_EXCHANGE;
1226 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1228 op);
1230}
1231
1232
1240static void
1242 const struct
1244{
1245 struct Operation *op = cls;
1246
1247 op->remote_element_count = ntohl (msg->sender_element_count);
1249 "Received remote element count (%u), I have %u\n",
1250 op->remote_element_count,
1251 op->my_element_count);
1252 if (((PHASE_INITIAL != op->phase) &&
1253 (PHASE_COUNT_SENT != op->phase)) ||
1254 (op->my_element_count > op->remote_element_count) ||
1255 (0 == op->my_element_count) ||
1256 (0 == op->remote_element_count))
1257 {
1258 GNUNET_break_op (0);
1260 return;
1261 }
1262 GNUNET_break (NULL == op->remote_bf);
1264 GNUNET_CADET_receive_done (op->channel);
1265}
1266
1267
1273static void
1275{
1277 "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
1278 op->phase,
1279 op->remote_element_count,
1280 op->my_element_count,
1281 GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
1282 switch (op->phase)
1283 {
1284 case PHASE_INITIAL:
1285 GNUNET_break_op (0);
1287 return;
1288 case PHASE_COUNT_SENT:
1289 /* This is the first BF being sent, build our initial map with
1290 filtering in place */
1291 op->my_element_count = 0;
1292 GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1294 op);
1295 break;
1296 case PHASE_BF_EXCHANGE:
1297 /* Update our set by reduction */
1300 op);
1301 break;
1303 GNUNET_break_op (0);
1305 return;
1307 GNUNET_break_op (0);
1309 return;
1310 case PHASE_FINISHED:
1311 GNUNET_break_op (0);
1313 return;
1314 }
1316 op->remote_bf = NULL;
1317
1318 if ((0 == op->my_element_count) || /* fully disjoint */
1319 ((op->my_element_count == op->remote_element_count) &&
1320 (0 == GNUNET_memcmp (&op->my_xor,
1321 &op->other_xor))))
1322 {
1323 /* we are done */
1324 op->phase = PHASE_MUST_SEND_DONE;
1326 "Intersection succeeded, sending DONE to other peer\n");
1328 op->local_bf = NULL;
1329 if (GNUNET_YES == op->return_intersection)
1330 {
1332 "Sending full result set (%u elements)\n",
1333 GNUNET_CONTAINER_multihashmap_size (op->my_elements));
1334 op->full_result_iter
1336 op->my_elements);
1338 return;
1339 }
1340 send_p2p_done (op);
1341 return;
1342 }
1343 op->phase = PHASE_BF_EXCHANGE;
1345}
1346
1347
1355static int
1357 const struct BFMessage *msg)
1358{
1359 struct Operation *op = cls;
1360
1361 (void) op;
1362 return GNUNET_OK;
1363}
1364
1365
1372static void
1374 const struct BFMessage *msg)
1375{
1376 struct Operation *op = cls;
1377 uint32_t bf_size;
1378 uint32_t chunk_size;
1379 uint32_t bf_bits_per_element;
1380
1381 switch (op->phase)
1382 {
1383 case PHASE_INITIAL:
1384 GNUNET_break_op (0);
1386 return;
1387
1388 case PHASE_COUNT_SENT:
1389 case PHASE_BF_EXCHANGE:
1390 bf_size = ntohl (msg->bloomfilter_total_length);
1391 bf_bits_per_element = ntohl (msg->bits_per_element);
1392 chunk_size = htons (msg->header.size) - sizeof(struct BFMessage);
1393 op->other_xor = msg->element_xor_hash;
1394 if (bf_size == chunk_size)
1395 {
1396 if (NULL != op->bf_data)
1397 {
1398 GNUNET_break_op (0);
1400 return;
1401 }
1402 /* single part, done here immediately */
1403 op->remote_bf
1404 = GNUNET_CONTAINER_bloomfilter_init ((const char *) &msg[1],
1405 bf_size,
1406 bf_bits_per_element);
1407 op->salt = ntohl (msg->sender_mutator);
1408 op->remote_element_count = ntohl (msg->sender_element_count);
1409 process_bf (op);
1410 break;
1411 }
1412 /* multipart chunk */
1413 if (NULL == op->bf_data)
1414 {
1415 /* first chunk, initialize */
1416 op->bf_data = GNUNET_malloc (bf_size);
1417 op->bf_data_size = bf_size;
1418 op->bf_bits_per_element = bf_bits_per_element;
1419 op->bf_data_offset = 0;
1420 op->salt = ntohl (msg->sender_mutator);
1421 op->remote_element_count = ntohl (msg->sender_element_count);
1422 }
1423 else
1424 {
1425 /* increment */
1426 if ((op->bf_data_size != bf_size) ||
1427 (op->bf_bits_per_element != bf_bits_per_element) ||
1428 (op->bf_data_offset + chunk_size > bf_size) ||
1429 (op->salt != ntohl (msg->sender_mutator)) ||
1430 (op->remote_element_count != ntohl (msg->sender_element_count)))
1431 {
1432 GNUNET_break_op (0);
1434 return;
1435 }
1436 }
1437 GNUNET_memcpy (&op->bf_data[op->bf_data_offset],
1438 (const char *) &msg[1],
1439 chunk_size);
1440 op->bf_data_offset += chunk_size;
1441 if (op->bf_data_offset == bf_size)
1442 {
1443 /* last chunk, run! */
1444 op->remote_bf
1446 bf_size,
1447 bf_bits_per_element);
1448 GNUNET_free (op->bf_data);
1449 op->bf_data = NULL;
1450 op->bf_data_size = 0;
1451 process_bf (op);
1452 }
1453 break;
1454
1455 default:
1456 GNUNET_break_op (0);
1458 return;
1459 }
1460 GNUNET_CADET_receive_done (op->channel);
1461}
1462
1463
1472static int
1473filter_all (void *cls,
1474 const struct GNUNET_HashCode *key,
1475 void *value)
1476{
1477 struct Operation *op = cls;
1478 struct ElementEntry *ee = value;
1479
1480 GNUNET_break (0 < op->my_element_count);
1481 op->my_element_count--;
1482 GNUNET_CRYPTO_hash_xor (&op->my_xor,
1483 &ee->element_hash,
1484 &op->my_xor);
1486 "Final reduction of my_elements, removing %s:%u\n",
1487 GNUNET_h2s (&ee->element_hash),
1488 ee->element.size);
1491 &ee->element_hash,
1492 ee));
1494 &ee->element);
1495 return GNUNET_YES;
1496}
1497
1498
1505static void
1507 const struct IntersectionDoneMessage *idm)
1508{
1509 struct Operation *op = cls;
1510
1511 if (PHASE_BF_EXCHANGE != op->phase)
1512 {
1513 /* wrong phase to conclude? FIXME: Or should we allow this
1514 if the other peer has _initially_ already an empty set? */
1515 GNUNET_break_op (0);
1517 return;
1518 }
1519 if (0 == ntohl (idm->final_element_count))
1520 {
1521 /* other peer determined empty set is the intersection,
1522 remove all elements */
1524 &filter_all,
1525 op);
1526 }
1527 if ((op->my_element_count != ntohl (idm->final_element_count)) ||
1528 (0 != GNUNET_memcmp (&op->my_xor,
1529 &idm->element_xor_hash)))
1530 {
1531 /* Other peer thinks we are done, but we disagree on the result! */
1532 GNUNET_break_op (0);
1534 return;
1535 }
1537 "Got IntersectionDoneMessage, have %u elements in intersection\n",
1538 op->my_element_count);
1539 op->phase = PHASE_DONE_RECEIVED;
1540 GNUNET_CADET_receive_done (op->channel);
1541
1542 GNUNET_assert (GNUNET_NO == op->client_done_sent);
1543 if (GNUNET_YES == op->return_intersection)
1544 {
1546 "Sending full result set to client (%u elements)\n",
1547 GNUNET_CONTAINER_multihashmap_size (op->my_elements));
1548 op->full_result_iter
1551 return;
1552 }
1553 op->phase = PHASE_FINISHED;
1555}
1556
1557
1566static struct Operation *
1567get_incoming (uint32_t id)
1568{
1569 for (struct Listener *listener = listener_head; NULL != listener;
1571 {
1572 for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
1573 if (op->suggest_id == id)
1574 return op;
1575 }
1576 return NULL;
1577}
1578
1579
1588static void *
1590 struct GNUNET_SERVICE_Client *c,
1591 struct GNUNET_MQ_Handle *mq)
1592{
1593 struct ClientState *cs;
1594
1595 num_clients++;
1596 cs = GNUNET_new (struct ClientState);
1597 cs->client = c;
1598 cs->mq = mq;
1599 return cs;
1600}
1601
1602
1611static int
1613 const struct GNUNET_HashCode *key,
1614 void *value)
1615{
1616 struct ElementEntry *ee = value;
1617
1618 GNUNET_free (ee);
1619 return GNUNET_YES;
1620}
1621
1622
1630static void
1632 struct GNUNET_SERVICE_Client *client,
1633 void *internal_cls)
1634{
1635 struct ClientState *cs = internal_cls;
1636 struct Operation *op;
1637 struct Listener *listener;
1638 struct Set *set;
1639
1640 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n");
1641 if (NULL != (set = cs->set))
1642 {
1643 struct SetContent *content = set->content;
1644
1645 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n");
1646 /* Destroy pending set operations */
1647 while (NULL != set->ops_head)
1649
1650 /* free set content (or at least decrement RC) */
1651 set->content = NULL;
1652 GNUNET_assert (0 != content->refcount);
1653 content->refcount--;
1654 if (0 == content->refcount)
1655 {
1656 GNUNET_assert (NULL != content->elements);
1659 NULL);
1661 content->elements = NULL;
1662 GNUNET_free (content);
1663 }
1664 GNUNET_free (set);
1665 }
1666
1667 if (NULL != (listener = cs->listener))
1668 {
1669 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n");
1671 listener->open_port = NULL;
1672 while (NULL != (op = listener->op_head))
1673 {
1675 "Destroying incoming operation `%u' from peer `%s'\n",
1676 (unsigned int) op->client_request_id,
1677 GNUNET_i2s (&op->peer));
1679 }
1681 GNUNET_free (listener);
1682 }
1683 GNUNET_free (cs);
1684 num_clients--;
1685 if ((GNUNET_YES == in_shutdown) && (0 == num_clients))
1686 {
1687 if (NULL != cadet)
1688 {
1690 cadet = NULL;
1691 }
1692 }
1693}
1694
1695
1704static int
1706 const struct OperationRequestMessage *msg)
1707{
1708 struct Operation *op = cls;
1709 struct Listener *listener = op->listener;
1710 const struct GNUNET_MessageHeader *nested_context;
1711
1712 /* double operation request */
1713 if (0 != op->suggest_id)
1714 {
1715 GNUNET_break_op (0);
1716 return GNUNET_SYSERR;
1717 }
1718 /* This should be equivalent to the previous condition, but can't hurt to check twice */
1719 if (NULL == listener)
1720 {
1721 GNUNET_break (0);
1722 return GNUNET_SYSERR;
1723 }
1724 nested_context = GNUNET_MQ_extract_nested_mh (msg);
1725 if ((NULL != nested_context) &&
1726 (ntohs (nested_context->size) > GNUNET_SETI_CONTEXT_MESSAGE_MAX_SIZE))
1727 {
1728 GNUNET_break_op (0);
1729 return GNUNET_SYSERR;
1730 }
1731 return GNUNET_OK;
1732}
1733
1734
1752static void
1754 const struct OperationRequestMessage *msg)
1755{
1756 struct Operation *op = cls;
1757 struct Listener *listener = op->listener;
1758 const struct GNUNET_MessageHeader *nested_context;
1759 struct GNUNET_MQ_Envelope *env;
1760 struct GNUNET_SETI_RequestMessage *cmsg;
1761
1762 nested_context = GNUNET_MQ_extract_nested_mh (msg);
1763 /* Make a copy of the nested_context (application-specific context
1764 information that is opaque to set) so we can pass it to the
1765 listener later on */
1766 if (NULL != nested_context)
1767 op->context_msg = GNUNET_copy_message (nested_context);
1768 op->remote_element_count = ntohl (msg->element_count);
1769 GNUNET_log (
1771 "Received P2P operation request (port %s) for active listener\n",
1772 GNUNET_h2s (&op->listener->app_id));
1773 GNUNET_assert (0 == op->suggest_id);
1774 if (0 == suggest_id)
1775 suggest_id++;
1776 op->suggest_id = suggest_id++;
1777 GNUNET_assert (NULL != op->timeout_task);
1778 GNUNET_SCHEDULER_cancel (op->timeout_task);
1779 op->timeout_task = NULL;
1782 op->context_msg);
1783 GNUNET_log (
1785 "Suggesting incoming request with accept id %u to listener %p of client %p\n",
1786 op->suggest_id,
1787 listener,
1788 listener->cs);
1789 cmsg->accept_id = htonl (op->suggest_id);
1790 cmsg->peer_id = op->peer;
1791 GNUNET_MQ_send (listener->cs->mq, env);
1792 /* NOTE: GNUNET_CADET_receive_done() will be called in
1793 #handle_client_accept() */
1794}
1795
1796
1805static void
1807 const struct GNUNET_SETI_CreateMessage *msg)
1808{
1809 struct ClientState *cs = cls;
1810 struct Set *set;
1811
1813 "Client created new intersection set\n");
1814 if (NULL != cs->set)
1815 {
1816 /* There can only be one set per client */
1817 GNUNET_break (0);
1819 return;
1820 }
1821 set = GNUNET_new (struct Set);
1822 set->content = GNUNET_new (struct SetContent);
1823 set->content->refcount = 1;
1825 GNUNET_YES);
1826 set->cs = cs;
1827 cs->set = set;
1829}
1830
1831
1841static void
1843{
1844 struct Operation *op = cls;
1845
1846 op->timeout_task = NULL;
1848 "Remote peer's incoming request timed out\n");
1850}
1851
1852
1869static void *
1872 const struct GNUNET_PeerIdentity *source)
1873{
1874 struct Listener *listener = cls;
1875 struct Operation *op;
1876
1878 "New incoming channel\n");
1879 op = GNUNET_new (struct Operation);
1880 op->listener = listener;
1881 op->peer = *source;
1882 op->channel = channel;
1883 op->mq = GNUNET_CADET_get_mq (op->channel);
1885 UINT32_MAX);
1888 op);
1891 op);
1892 return op;
1893}
1894
1895
1912static void
1913channel_end_cb (void *channel_ctx,
1914 const struct GNUNET_CADET_Channel *channel)
1915{
1916 struct Operation *op = channel_ctx;
1917
1918 op->channel = NULL;
1920}
1921
1922
1937static void
1939 const struct GNUNET_CADET_Channel *channel,
1940 int window_size)
1941{
1942 /* FIXME: not implemented, we could do flow control here... */
1943}
1944
1945
1952static void
1954 const struct GNUNET_SETI_ListenMessage *msg)
1955{
1956 struct ClientState *cs = cls;
1957 struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1958 GNUNET_MQ_hd_var_size (incoming_msg,
1961 NULL),
1962 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1965 NULL),
1966 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1968 struct BFMessage,
1969 NULL),
1970 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1973 NULL),
1975 };
1976 struct Listener *listener;
1977
1978 if (NULL != cs->listener)
1979 {
1980 /* max. one active listener per client! */
1981 GNUNET_break (0);
1983 return;
1984 }
1985 listener = GNUNET_new (struct Listener);
1986 listener->cs = cs;
1987 cs->listener = listener;
1988 listener->app_id = msg->app_id;
1991 listener);
1993 "New listener for set intersection created (port %s)\n",
1994 GNUNET_h2s (&listener->app_id));
1996 &msg->app_id,
1998 listener,
2001 cadet_handlers);
2003}
2004
2005
2013static void
2015 const struct GNUNET_SETI_RejectMessage *msg)
2016{
2017 struct ClientState *cs = cls;
2018 struct Operation *op;
2019
2020 op = get_incoming (ntohl (msg->accept_reject_id));
2021 if (NULL == op)
2022 {
2023 /* no matching incoming operation for this reject;
2024 could be that the other peer already disconnected... */
2026 "Client rejected unknown operation %u\n",
2027 (unsigned int) ntohl (msg->accept_reject_id));
2029 return;
2030 }
2032 "Peer request (app %s) rejected by client\n",
2033 GNUNET_h2s (&cs->listener->app_id));
2036}
2037
2038
2045static int
2047 const struct GNUNET_SETI_ElementMessage *msg)
2048{
2049 /* NOTE: Technically, we should probably check with the
2050 block library whether the element we are given is well-formed */
2051 return GNUNET_OK;
2052}
2053
2054
2061static void
2063 const struct GNUNET_SETI_ElementMessage *msg)
2064{
2065 struct ClientState *cs = cls;
2066 struct Set *set;
2067 struct GNUNET_SETI_Element el;
2068 struct ElementEntry *ee;
2069 struct GNUNET_HashCode hash;
2070
2071 if (NULL == (set = cs->set))
2072 {
2073 /* client without a set requested an operation */
2074 GNUNET_break (0);
2076 return;
2077 }
2079 el.size = ntohs (msg->header.size) - sizeof(*msg);
2080 el.data = &msg[1];
2081 el.element_type = ntohs (msg->element_type);
2083 &hash);
2085 &hash);
2086 if (NULL == ee)
2087 {
2089 "Client inserts element %s of size %u\n",
2090 GNUNET_h2s (&hash),
2091 el.size);
2092 ee = GNUNET_malloc (el.size + sizeof(*ee));
2093 ee->element.size = el.size;
2094 GNUNET_memcpy (&ee[1], el.data, el.size);
2095 ee->element.data = &ee[1];
2096 ee->element.element_type = el.element_type;
2097 ee->remote = GNUNET_NO;
2098 ee->element_hash = hash;
2101 set->content->elements,
2102 &ee->element_hash,
2103 ee,
2105 }
2106 else
2107 {
2109 "Client inserted element %s of size %u twice (ignored)\n",
2110 GNUNET_h2s (&hash),
2111 el.size);
2112 /* same element inserted twice */
2113 return;
2114 }
2116}
2117
2118
2125static void
2127{
2129 {
2130 set->content->latest_generation++;
2131 set->current_generation++;
2132 return;
2133 }
2135}
2136
2137
2147static int
2149 const struct GNUNET_SETI_EvaluateMessage *msg)
2150{
2151 /* FIXME: suboptimal, even if the context below could be NULL,
2152 there are malformed messages this does not check for... */
2153 return GNUNET_OK;
2154}
2155
2156
2165static void
2167 const struct GNUNET_SETI_EvaluateMessage *msg)
2168{
2169 struct ClientState *cs = cls;
2170 struct Operation *op = GNUNET_new (struct Operation);
2171 const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2172 GNUNET_MQ_hd_var_size (incoming_msg,
2175 op),
2176 GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
2179 op),
2180 GNUNET_MQ_hd_var_size (intersection_p2p_bf,
2182 struct BFMessage,
2183 op),
2184 GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
2187 op),
2189 };
2190 struct Set *set;
2191 const struct GNUNET_MessageHeader *context;
2192
2193 if (NULL == (set = cs->set))
2194 {
2195 GNUNET_break (0);
2196 GNUNET_free (op);
2198 return;
2199 }
2201 UINT32_MAX);
2202 op->peer = msg->target_peer;
2203 op->return_intersection = htonl (msg->return_intersection);
2204 fprintf (stderr,
2205 "Return intersection for evaluate is %d\n",
2206 op->return_intersection);
2207 op->client_request_id = ntohl (msg->request_id);
2209
2210 /* Advance generation values, so that
2211 mutations won't interfere with the running operation. */
2212 op->set = set;
2213 op->generation_created = set->current_generation;
2214 advance_generation (set);
2216 set->ops_tail,
2217 op);
2219 "Creating new CADET channel to port %s for set intersection\n",
2220 GNUNET_h2s (&msg->app_id));
2222 op,
2223 &msg->target_peer,
2224 &msg->app_id,
2227 cadet_handlers);
2228 op->mq = GNUNET_CADET_get_mq (op->channel);
2229 {
2230 struct GNUNET_MQ_Envelope *ev;
2231 struct OperationRequestMessage *msg_tmp;
2232
2233 ev = GNUNET_MQ_msg_nested_mh (msg_tmp,
2235 context);
2236 if (NULL == ev)
2237 {
2238 /* the context message is too large!? */
2239 GNUNET_break (0);
2241 return;
2242 }
2244 "Initiating intersection operation evaluation\n");
2245 /* we started the operation, thus we have to send the operation request */
2246 op->phase = PHASE_INITIAL;
2247 op->my_element_count = op->set->current_set_element_count;
2248 op->my_elements
2249 = GNUNET_CONTAINER_multihashmap_create (op->my_element_count,
2250 GNUNET_YES);
2251
2252 msg_tmp->element_count = htonl (op->my_element_count);
2253 GNUNET_MQ_send (op->mq,
2254 ev);
2255 op->phase = PHASE_COUNT_SENT;
2256 if (NULL != context)
2258 "Sent op request with context message\n");
2259 else
2261 "Sent op request without context message\n");
2262 }
2264}
2265
2266
2273static void
2275 const struct GNUNET_SETI_CancelMessage *msg)
2276{
2277 struct ClientState *cs = cls;
2278 struct Set *set;
2279 struct Operation *op;
2280 int found;
2281
2282 if (NULL == (set = cs->set))
2283 {
2284 /* client without a set requested an operation */
2285 GNUNET_break (0);
2287 return;
2288 }
2289 found = GNUNET_NO;
2290 for (op = set->ops_head; NULL != op; op = op->next)
2291 {
2292 if (op->client_request_id == ntohl (msg->request_id))
2293 {
2294 found = GNUNET_YES;
2295 break;
2296 }
2297 }
2298 if (GNUNET_NO == found)
2299 {
2300 /* It may happen that the operation was already destroyed due to
2301 * the other peer disconnecting. The client may not know about this
2302 * yet and try to cancel the (just barely non-existent) operation.
2303 * So this is not a hard error.
2304 *///
2306 "Client canceled non-existent op %u\n",
2307 (uint32_t) ntohl (msg->request_id));
2308 }
2309 else
2310 {
2312 "Client requested cancel for op %u\n",
2313 (uint32_t) ntohl (msg->request_id));
2315 }
2317}
2318
2319
2328static void
2330 const struct GNUNET_SETI_AcceptMessage *msg)
2331{
2332 struct ClientState *cs = cls;
2333 struct Set *set;
2334 struct Operation *op;
2335 struct GNUNET_SETI_ResultMessage *result_message;
2336 struct GNUNET_MQ_Envelope *ev;
2337 struct Listener *listener;
2338
2339 if (NULL == (set = cs->set))
2340 {
2341 /* client without a set requested to accept */
2342 GNUNET_break (0);
2344 return;
2345 }
2346 op = get_incoming (ntohl (msg->accept_reject_id));
2347 if (NULL == op)
2348 {
2349 /* It is not an error if the set op does not exist -- it may
2350 * have been destroyed when the partner peer disconnected. */
2351 GNUNET_log (
2353 "Client %p accepted request %u of listener %p that is no longer active\n",
2354 cs,
2355 ntohl (msg->accept_reject_id),
2356 cs->listener);
2357 ev = GNUNET_MQ_msg (result_message,
2359 result_message->request_id = msg->request_id;
2360 result_message->result_status = htons (GNUNET_SETI_STATUS_FAILURE);
2361 GNUNET_MQ_send (set->cs->mq, ev);
2363 return;
2364 }
2366 "Client accepting request %u\n",
2367 (uint32_t) ntohl (msg->accept_reject_id));
2368 listener = op->listener;
2369 op->listener = NULL;
2370 op->return_intersection = htonl (msg->return_intersection);
2371 fprintf (stderr,
2372 "Return intersection for accept is %d\n",
2373 op->return_intersection);
2375 listener->op_tail,
2376 op);
2377 op->set = set;
2379 set->ops_tail,
2380 op);
2381 op->client_request_id = ntohl (msg->request_id);
2382
2383 /* Advance generation values, so that future mutations do not
2384 interfere with the running operation. */
2385 op->generation_created = set->current_generation;
2386 advance_generation (set);
2387 {
2389 "Accepting set intersection operation\n");
2390 op->phase = PHASE_INITIAL;
2391 op->my_element_count
2392 = op->set->current_set_element_count;
2393 op->my_elements
2395 GNUNET_MIN (op->my_element_count,
2396 op->remote_element_count),
2397 GNUNET_YES);
2398 if (op->remote_element_count < op->my_element_count)
2399 {
2400 /* If the other peer (Alice) has fewer elements than us (Bob),
2401 we just send the count as Alice should send the first BF */
2403 op->phase = PHASE_COUNT_SENT;
2404 }
2405 else
2406 {
2407 /* We have fewer elements, so we start with the BF */
2409 }
2410 }
2411 /* Now allow CADET to continue, as we did not do this in
2412 #handle_incoming_msg (as we wanted to first see if the
2413 local client would accept the request). */
2414 GNUNET_CADET_receive_done (op->channel);
2416}
2417
2418
2424static void
2425shutdown_task (void *cls)
2426{
2427 /* Delay actual shutdown to allow service to disconnect clients */
2429 if (0 == num_clients)
2430 {
2431 if (NULL != cadet)
2432 {
2434 cadet = NULL;
2435 }
2436 }
2438 GNUNET_YES);
2440 "handled shutdown request\n");
2441}
2442
2443
2452static void
2453run (void *cls,
2454 const struct GNUNET_CONFIGURATION_Handle *cfg,
2456{
2457 /* FIXME: need to modify SERVICE (!) API to allow
2458 us to run a shutdown task *after* clients were
2459 forcefully disconnected! */
2461 NULL);
2463 cfg);
2465 if (NULL == cadet)
2466 {
2468 _ ("Could not connect to CADET service\n"));
2470 return;
2471 }
2472}
2473
2474
2479 "seti",
2481 &run,
2484 NULL,
2485 GNUNET_MQ_hd_fixed_size (client_accept,
2488 NULL),
2489 GNUNET_MQ_hd_var_size (client_set_add,
2492 NULL),
2493 GNUNET_MQ_hd_fixed_size (client_create_set,
2496 NULL),
2497 GNUNET_MQ_hd_var_size (client_evaluate,
2500 NULL),
2501 GNUNET_MQ_hd_fixed_size (client_listen,
2504 NULL),
2505 GNUNET_MQ_hd_fixed_size (client_reject,
2508 NULL),
2509 GNUNET_MQ_hd_fixed_size (client_cancel,
2512 NULL),
2514
2515
2516/* end of gnunet-service-seti.c */
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
static struct GNUNET_ARM_Operation * op
Current operation.
Definition: gnunet-arm.c:143
static struct GNUNET_CONFIGURATION_Handle * cfg
Our configuration.
Definition: gnunet-arm.c:108
struct GNUNET_HashCode key
The key used in the DHT.
static GstElement * source
Appsrc instance into which we write data for the pipeline.
static pa_context * context
Pulseaudio context.
static struct GNUNET_IDENTITY_EgoLookup * el
Handle to identity lookup.
static char * res
Currently read line or NULL on EOF.
static char * value
Value of the record to add/remove.
static struct GNUNET_SERVICE_Handle * service
Handle to our service instance.
IntersectionOperationPhase
Current phase we are in for a intersection operation.
static void _GSS_operation_destroy(struct Operation *op)
Destroy the given operation.
static void _GSS_operation_destroy2(struct Operation *op)
This function probably should not exist and be replaced by inlining more specific logic in the variou...
static int _GSS_is_element_of_operation(struct ElementEntry *ee, struct Operation *op)
Is element ee part of the set used by op?
static int check_client_evaluate(void *cls, const struct GNUNET_SETI_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
static void handle_intersection_p2p_bf(void *cls, const struct BFMessage *msg)
Handle an BF message from a remote peer.
static uint32_t suggest_id
Counter for allocating unique IDs for clients, used to identify incoming operation requests from remo...
static int check_intersection_p2p_bf(void *cls, const struct BFMessage *msg)
Check an BF message from a remote peer.
@ PHASE_INITIAL
We are just starting.
@ PHASE_DONE_RECEIVED
We have received the P2P DONE message, and must finish with the local client before terminating the c...
@ PHASE_FINISHED
The protocol is over.
@ PHASE_BF_EXCHANGE
We have initialized our set and are now reducing it by exchanging Bloom filters until one party notic...
@ PHASE_COUNT_SENT
We have send the number of our elements to the other peer, but did not setup our element set yet.
@ PHASE_MUST_SEND_DONE
We must next send the P2P DONE message (after finishing mostly with the local client).
static struct Listener * listener_head
Listeners are held in a doubly linked list.
static int iterator_bf_reduce(void *cls, const struct GNUNET_HashCode *key, void *value)
Removes elements from our hashmap if they are not contained within the provided remote bloomfilter.
static void handle_client_evaluate(void *cls, const struct GNUNET_SETI_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
static void handle_client_listen(void *cls, const struct GNUNET_SETI_ListenMessage *msg)
Called when a client wants to create a new listener.
static int check_client_set_add(void *cls, const struct GNUNET_SETI_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
static void begin_bf_exchange(struct Operation *op)
We go first, initialize our map with all elements and send the first Bloom filter.
static struct GNUNET_CADET_Handle * cadet
Handle to the cadet service, used to listen for and connect to remote peers.
static int filter_all(void *cls, const struct GNUNET_HashCode *key, void *value)
Remove all elements from our hashmap.
static int filtered_map_initialization(void *cls, const struct GNUNET_HashCode *key, void *value)
Fills the "my_elements" hashmap with all relevant elements.
static void send_p2p_done(struct Operation *op)
Notify the other peer that we are done.
static void send_remaining_elements(void *cls)
Send all elements in the full result iterator.
static void handle_client_reject(void *cls, const struct GNUNET_SETI_RejectMessage *msg)
Called when the listening client rejects an operation request by another peer.
static void fail_intersection_operation(struct Operation *op)
Inform the client that the intersection operation has failed, and proceed to destroy the evaluate ope...
static void handle_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Handle a request for a set operation from another peer.
static int in_shutdown
Are we in shutdown? if GNUNET_YES and the number of clients drops to zero, disconnect from CADET.
static int destroy_elements_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator over hash map entries to free element entries.
static void shutdown_task(void *cls)
Called to clean up, after a shutdown has been requested.
static struct GNUNET_STATISTICS_Handle * _GSS_statistics
Statistics handle.
static unsigned int num_clients
Number of active clients.
static void finished_local_operations(void *cls)
Remember that we are done dealing with the local client AND have sent the other peer our message that...
static void channel_window_cb(void *cls, const struct GNUNET_CADET_Channel *channel, int window_size)
Function called whenever an MQ-channel's transmission window size changes.
static void advance_generation(struct Set *set)
Advance the current generation of a set, adding exclusion ranges if necessary.
static int check_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Check a request for a set operation from another peer.
static void handle_client_accept(void *cls, const struct GNUNET_SETI_AcceptMessage *msg)
Handle a request from the client to accept a set operation that came from a remote peer.
static void send_client_done_and_destroy(void *cls)
Signal to the client that the operation has finished and destroy the operation.
static void send_client_removed_element(struct Operation *op, struct GNUNET_SETI_Element *element)
If applicable in the current operation mode, send a result message to the client indicating we remove...
static void incoming_destroy(struct Operation *op)
Destroy an incoming request from a remote peer.
static struct Listener * listener_tail
Listeners are held in a doubly linked list.
static int initialize_map_unfiltered(void *cls, const struct GNUNET_HashCode *key, void *value)
Fills the "my_elements" hashmap with the initial set of (non-deleted) elements from the set of the sp...
static void send_bloomfilter(struct Operation *op)
Send a bloomfilter to our peer.
static void * channel_new_cb(void *cls, struct GNUNET_CADET_Channel *channel, const struct GNUNET_PeerIdentity *source)
Method called whenever another peer has added us to a channel the other peer initiated.
static struct Operation * get_incoming(uint32_t id)
Get the incoming socket associated with the given id.
static void process_bf(struct Operation *op)
Process a Bloomfilter once we got all the chunks.
static void * client_connect_cb(void *cls, struct GNUNET_SERVICE_Client *c, struct GNUNET_MQ_Handle *mq)
Callback called when a client connects to the service.
static void send_element_count(struct Operation *op)
Send our element count to the peer, in case our element count is lower than theirs.
static void handle_intersection_p2p_element_info(void *cls, const struct IntersectionElementInfoMessage *msg)
Handle the initial struct IntersectionElementInfoMessage from a remote peer.
#define INCOMING_CHANNEL_TIMEOUT
How long do we hold on to an incoming channel if there is no local listener before giving up?
static void handle_client_cancel(void *cls, const struct GNUNET_SETI_CancelMessage *msg)
Handle a request from the client to cancel a running set operation.
static void client_disconnect_cb(void *cls, struct GNUNET_SERVICE_Client *client, void *internal_cls)
Clean up after a client has disconnected.
static void channel_end_cb(void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
Function called whenever a channel is destroyed.
static void handle_client_create_set(void *cls, const struct GNUNET_SETI_CreateMessage *msg)
Called when a client wants to create a new set.
static void incoming_timeout_cb(void *cls)
Timeout happens iff:
static int iterator_bf_create(void *cls, const struct GNUNET_HashCode *key, void *value)
Create initial bloomfilter based on all the elements given.
static void handle_client_set_add(void *cls, const struct GNUNET_SETI_ElementMessage *msg)
Called when a client wants to add an element to a set it inhabits.
static void run(void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_SERVICE_Handle *service)
Function called by the service's run method to run service-specific setup code.
static void handle_intersection_p2p_done(void *cls, const struct IntersectionDoneMessage *idm)
Handle a done message from a remote peer.
GNUNET_SERVICE_MAIN("seti", GNUNET_SERVICE_OPTION_NONE, &run, &client_connect_cb, &client_disconnect_cb, NULL, GNUNET_MQ_hd_fixed_size(client_accept, GNUNET_MESSAGE_TYPE_SETI_ACCEPT, struct GNUNET_SETI_AcceptMessage, NULL), GNUNET_MQ_hd_var_size(client_set_add, GNUNET_MESSAGE_TYPE_SETI_ADD, struct GNUNET_SETI_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size(client_create_set, GNUNET_MESSAGE_TYPE_SETI_CREATE, struct GNUNET_SETI_CreateMessage, NULL), GNUNET_MQ_hd_var_size(client_evaluate, GNUNET_MESSAGE_TYPE_SETI_EVALUATE, struct GNUNET_SETI_EvaluateMessage, NULL), GNUNET_MQ_hd_fixed_size(client_listen, GNUNET_MESSAGE_TYPE_SETI_LISTEN, struct GNUNET_SETI_ListenMessage, NULL), GNUNET_MQ_hd_fixed_size(client_reject, GNUNET_MESSAGE_TYPE_SETI_REJECT, struct GNUNET_SETI_RejectMessage, NULL), GNUNET_MQ_hd_fixed_size(client_cancel, GNUNET_MESSAGE_TYPE_SETI_CANCEL, struct GNUNET_SETI_CancelMessage, NULL), GNUNET_MQ_handler_end())
Define "main" method using service macro.
Peer-to-Peer messages for gnunet set.
Library for data block manipulation.
CADET service; establish channels to distant peers.
Two-peer set intersection operations.
API to create, modify and access statistics.
void GNUNET_BLOCK_mingle_hash(const struct GNUNET_HashCode *in, uint32_t mingle_number, struct GNUNET_HashCode *hc)
Mingle hash with the mingle_number to produce different bits.
Definition: block.c:96
struct GNUNET_CONTAINER_BloomFilter * GNUNET_CONTAINER_bloomfilter_init(const char *data, size_t size, unsigned int k)
Create a Bloom filter from raw bits.
void GNUNET_CONTAINER_bloomfilter_add(struct GNUNET_CONTAINER_BloomFilter *bf, const struct GNUNET_HashCode *e)
Add an element to the filter.
bool GNUNET_CONTAINER_bloomfilter_test(const struct GNUNET_CONTAINER_BloomFilter *bf, const struct GNUNET_HashCode *e)
Test if an element is in the filter.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_bloomfilter_get_raw_data(const struct GNUNET_CONTAINER_BloomFilter *bf, char *data, size_t size)
Copy the raw data of this Bloom filter into the given data array.
void GNUNET_CONTAINER_bloomfilter_free(struct GNUNET_CONTAINER_BloomFilter *bf)
Free the space associated with a filter in memory, flush to drive if needed (do not free the space on...
struct GNUNET_CADET_Handle * GNUNET_CADET_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the MQ-based cadet service.
Definition: cadet_api.c:894
void GNUNET_CADET_receive_done(struct GNUNET_CADET_Channel *channel)
Indicate readiness to receive the next message on a channel.
Definition: cadet_api.c:872
void GNUNET_CADET_channel_destroy(struct GNUNET_CADET_Channel *channel)
Destroy an existing channel.
Definition: cadet_api.c:830
struct GNUNET_MQ_Handle * GNUNET_CADET_get_mq(const struct GNUNET_CADET_Channel *channel)
Obtain the message queue for a connected channel.
Definition: cadet_api.c:1066
struct GNUNET_CADET_Port * GNUNET_CADET_open_port(struct GNUNET_CADET_Handle *h, const struct GNUNET_HashCode *port, GNUNET_CADET_ConnectEventHandler connects, void *connects_cls, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Open a port to receive incoming MQ-based channels.
Definition: cadet_api.c:954
void GNUNET_CADET_disconnect(struct GNUNET_CADET_Handle *handle)
Disconnect from the cadet service.
Definition: cadet_api.c:774
void GNUNET_CADET_close_port(struct GNUNET_CADET_Port *p)
Close a port opened with GNUNET_CADET_open_port.
Definition: cadet_api.c:801
struct GNUNET_CADET_Channel * GNUNET_CADET_channel_create(struct GNUNET_CADET_Handle *h, void *channel_cls, const struct GNUNET_PeerIdentity *destination, const struct GNUNET_HashCode *port, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Create a new channel towards a remote peer.
Definition: cadet_api.c:1015
uint32_t GNUNET_CRYPTO_random_u32(enum GNUNET_CRYPTO_Quality mode, uint32_t i)
Produce a random value.
@ GNUNET_CRYPTO_QUALITY_NONCE
Randomness for IVs etc.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
void GNUNET_CRYPTO_hash_xor(const struct GNUNET_HashCode *a, const struct GNUNET_HashCode *b, struct GNUNET_HashCode *result)
compute result = a ^ b
Definition: crypto_hash.c:135
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MultiHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_iterator_next(struct GNUNET_CONTAINER_MultiHashMapIterator *iter, struct GNUNET_HashCode *key, const void **value)
Retrieve the next element from the hash map at the iterator's position.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_remove(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, const void *value)
Remove the given key-value pair from the map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
unsigned int GNUNET_CONTAINER_multihashmap_size(const struct GNUNET_CONTAINER_MultiHashMap *map)
Get the number of key-value pairs in the map.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
void GNUNET_CONTAINER_multihashmap_iterator_destroy(struct GNUNET_CONTAINER_MultiHashMapIterator *iter)
Destroy a multihashmap iterator.
struct GNUNET_CONTAINER_MultiHashMapIterator * GNUNET_CONTAINER_multihashmap_iterator_create(const struct GNUNET_CONTAINER_MultiHashMap *map)
Create an iterator for a multihashmap.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY
There must only be one value per key; storing a value should fail if a value under the same key alrea...
#define GNUNET_log(kind,...)
#define GNUNET_memcmp(a, b)
Compare memory in a and b, where both must be of the same pointer type.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
#define GNUNET_MIN(a, b)
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
@ GNUNET_OK
@ GNUNET_YES
@ GNUNET_NO
@ GNUNET_SYSERR
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur.
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_DEBUG
@ GNUNET_ERROR_TYPE_INFO
struct GNUNET_MessageHeader * GNUNET_copy_message(const struct GNUNET_MessageHeader *msg)
Create a copy of the given message.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_free(ptr)
Wrapper around free.
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:305
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct.
Definition: gnunet_mq_lib.h:63
#define GNUNET_MQ_msg_nested_mh(mvar, type, mh)
Allocate a GNUNET_MQ_Envelope, and append a payload message after the given message struct.
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:78
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
void GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Call a callback once the envelope has been sent, that is, sending it can not be canceled anymore.
Definition: mq.c:655
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
#define GNUNET_MQ_extract_nested_mh(var)
Return a pointer to the message at the end of the given message.
#define GNUNET_MESSAGE_TYPE_SETI_LISTEN
Listen for operation requests.
#define GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST
Request to begin set intersection operation.
#define GNUNET_MESSAGE_TYPE_SETI_REJECT
Reject a set request.
#define GNUNET_MESSAGE_TYPE_SETI_EVALUATE
Evaluate a set operation.
#define GNUNET_MESSAGE_TYPE_SETI_ADD
Add element to set.
#define GNUNET_MESSAGE_TYPE_SETI_P2P_BF
Bloom filter message for intersection exchange started by Bob.
#define GNUNET_MESSAGE_TYPE_SETI_RESULT
Handle result message from operation.
#define GNUNET_MESSAGE_TYPE_SETI_REQUEST
Notify the client of an incoming request from a remote peer.
#define GNUNET_MESSAGE_TYPE_SETI_CREATE
Create a new local set.
#define GNUNET_MESSAGE_TYPE_SETI_CANCEL
Cancel a set operation.
#define GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO
Information about the element count for intersection.
#define GNUNET_MESSAGE_TYPE_SETI_ACCEPT
Accept an incoming set request.
#define GNUNET_MESSAGE_TYPE_SETI_P2P_DONE
Intersection operation is done.
void GNUNET_SCHEDULER_shutdown(void)
Request the shutdown of a scheduler.
Definition: scheduler.c:566
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run on shutdown, that is when a CTRL-C signal is received,...
Definition: scheduler.c:1338
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:979
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1276
void GNUNET_SERVICE_client_drop(struct GNUNET_SERVICE_Client *c)
Ask the server to disconnect from the given client.
Definition: service.c:2377
void GNUNET_SERVICE_client_continue(struct GNUNET_SERVICE_Client *c)
Continue receiving further messages from the given client.
Definition: service.c:2348
@ GNUNET_SERVICE_OPTION_NONE
Use defaults.
#define GNUNET_SETI_CONTEXT_MESSAGE_MAX_SIZE
Maximum size of a context message for set operation requests.
void GNUNET_SETI_element_hash(const struct GNUNET_SETI_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition: seti_api.c:849
@ GNUNET_SETI_STATUS_DONE
Success, all elements have been sent (and received).
@ GNUNET_SETI_STATUS_ADD_LOCAL
Element should be added to the result set of the local peer, i.e.
@ GNUNET_SETI_STATUS_FAILURE
The other peer refused to do the operation with us, or something went wrong.
@ GNUNET_SETI_STATUS_DEL_LOCAL
Element should be delete from the result set of the local peer, i.e.
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
#define _(String)
GNU gettext support macro.
Definition: platform.h:178
Bloom filter messages exchanged for set intersection calculation.
State we keep per client.
struct GNUNET_MQ_Handle * mq
MQ to talk to client.
struct Listener * listener
Listener, if associated with the client, otherwise NULL.
struct Set * set
Set, if associated with the client, otherwise NULL.
struct GNUNET_SERVICE_Client * client
Client this is about.
Information about an element element in the set.
int remote
GNUNET_YES if the element is a remote element, and does not belong to the operation's set.
struct GNUNET_SET_Element element
The actual element.
struct GNUNET_HashCode element_hash
Hash of the element.
unsigned int generation_added
Generation in which the element was added.
struct GNUNET_ARM_Operation * next
This is a doubly-linked list.
Definition: arm_api.c:45
Opaque handle to a channel.
Definition: cadet.h:116
Opaque handle to the service.
Definition: cadet_api.c:39
Opaque handle to a port.
Definition: cadet_api.c:80
Internal representation of the hash map.
A 512-bit hashcode.
Handle to a message queue.
Definition: mq.c:87
Message handler for a specific message type.
Header for all communications.
The identity of the host (wraps the signing key of the peer).
Entry in list of pending tasks.
Definition: scheduler.c:135
Handle to a client that is connected to a service.
Definition: service.c:245
Handle to a service.
Definition: service.c:116
Message sent by a listening client to the service to accept performing the operation with the other p...
Definition: seti.h:77
Sent to the service by the client in order to cancel a set operation.
Definition: seti.h:252
Message sent by the client to the service to ask starting a new set to perform operations with.
Definition: seti.h:40
Message sent by client to the service to add an element to the set.
Definition: seti.h:227
Element stored in a set.
const void * data
Actual data of the element.
uint16_t element_type
Application-specific element type.
uint16_t size
Number of bytes in the buffer pointed to by data.
Message sent by client to service to initiate a set operation as a client (not as listener).
Definition: seti.h:152
Message sent by the client to the service to start listening for incoming requests to perform a certa...
Definition: seti.h:54
Message sent by a listening client to the service to reject performing the operation with the other p...
Definition: seti.h:107
A request for an operation with another client.
Definition: seti.h:124
struct GNUNET_PeerIdentity peer_id
Identity of the requesting peer.
Definition: seti.h:139
uint32_t accept_id
ID of the to identify the request when accepting or rejecting it.
Definition: seti.h:134
Message sent by the service to the client to indicate an element that is removed (set intersection) o...
Definition: seti.h:192
uint32_t request_id
id the result belongs to
Definition: seti.h:206
uint16_t element_type
Type of the element attached to the message, if any.
Definition: seti.h:217
uint16_t result_status
Was the evaluation successful? Contains an enum GNUNET_SETI_Status in NBO.
Definition: seti.h:212
uint16_t size
Number of bytes in the buffer pointed to by data.
const void * data
Actual data of the element.
uint16_t element_type
Application-specific element type.
Handle for the service.
Last message, send to confirm the final set.
uint32_t final_element_count
Final number of elements in intersection.
struct GNUNET_HashCode element_xor_hash
XOR of all hashes over all elements remaining in the set.
During intersection, the first (and possibly second) message send it the number of elements in the se...
A listener is inhabited by a client, and waits for evaluation requests from remote peers.
struct Listener * next
Listeners are held in a doubly linked list.
struct ClientState * cs
Client that owns the listener.
struct GNUNET_HashCode app_id
Application ID for the operation, used to distinguish multiple operations of the same type with the s...
struct GNUNET_CADET_Port * open_port
The port we are listening on with CADET.
struct Listener * prev
Listeners are held in a doubly linked list.
struct Operation * op_tail
Tail of DLL of operations this listener is responsible for.
struct Operation * op_head
Head of DLL of operations this listener is responsible for.
uint32_t element_count
For Intersection: my element count.
Operation context used to execute a set operation.
unsigned int generation_created
Generation in which the operation handle was created.
uint32_t suggest_id
Unique request id for the request from a remote peer, sent to the client, which will accept or reject...
int channel_death_expected
Set whenever we reach the state where the death of the channel is perfectly find and should NOT resul...
struct GNUNET_MessageHeader * context_msg
Context message, may be NULL.
struct GNUNET_CONTAINER_BloomFilter * local_bf
BF of the set's element.
uint32_t bf_data_offset
How many bytes of bf_data are valid?
struct Operation * prev
Kept in a DLL of the listener, if listener is non-NULL.
struct GNUNET_CADET_Channel * channel
Channel to the peer.
uint32_t bf_bits_per_element
size of the bloomfilter
struct GNUNET_CONTAINER_BloomFilter * remote_bf
The bf we currently receive.
struct GNUNET_HashCode my_xor
XOR of the keys of all of the elements (remaining) in my set.
struct GNUNET_HashCode other_xor
XOR of the keys of all of the elements (remaining) in the other peer's set.
char * bf_data
For multipart BF transmissions, we have to store the bloomfilter-data until we fully received it.
struct GNUNET_MQ_Handle * mq
Message queue for the channel.
struct GNUNET_PeerIdentity peer
The identity of the requesting peer.
int client_done_sent
Did we send the client that we are done?
enum IntersectionOperationPhase phase
Current state of the operation.
struct GNUNET_CONTAINER_MultiHashMapIterator * full_result_iter
Iterator for sending the final set of my_elements to the client.
int return_intersection
When are elements sent to the client, and which elements are sent?
struct Listener * listener
Port this operation runs on.
uint32_t bf_data_size
size of the bloomfilter in bf_data.
struct Operation * next
Kept in a DLL of the listener, if listener is non-NULL.
uint32_t salt
Salt to use for the operation.
uint32_t my_element_count
Current element count contained within my_elements.
uint32_t remote_element_count
Remote peers element count.
uint32_t client_request_id
ID used to identify an operation between service and client.
struct GNUNET_CONTAINER_MultiHashMap * my_elements
Remaining elements in the intersection operation.
struct GNUNET_SCHEDULER_Task * timeout_task
Timeout task, if the incoming peer has not been accepted after the timeout, it will be disconnected.
struct Set * set
Set associated with the operation, NULL until the spec has been associated with a set.
SetContent stores the actual set elements, which may be shared by multiple generations derived from o...
int iterator_count
Number of concurrently active iterators.
unsigned int latest_generation
FIXME: document!
struct GNUNET_CONTAINER_MultiHashMap * elements
Maps struct GNUNET_HashCode * to struct ElementEntry *.
unsigned int refcount
Number of references to the content.
A set that supports a specific operation with other peers.
struct Set * next
Sets are held in a doubly linked list (in sets_head and sets_tail).
struct Operation * ops_head
Evaluate operations are held in a linked list.
struct Operation * ops_tail
Evaluate operations are held in a linked list.
struct Set * prev
Sets are held in a doubly linked list.
struct SetContent * content
Content, possibly shared by multiple sets, and thus reference counted.
uint32_t current_set_element_count
Number of currently valid elements in the set which have not been removed.
struct ClientState * cs
Client that owns the set.
unsigned int current_generation
Current generation, that is, number of previously executed operations and lazy copies on the underlyi...