GNUnet  0.19.4
gnunet-service-seti.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2013-2017, 2020 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
26 #include "platform.h"
29 #include "gnunet_cadet_service.h"
30 #include "gnunet_seti_service.h"
31 #include "gnunet_block_lib.h"
32 #include "seti.h"
33 
38 #define INCOMING_CHANNEL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
39 
40 
45 {
50 
56 
63 
69 
75 
81 };
82 
83 
87 struct Set;
88 
95 struct ElementEntry;
96 
100 struct Operation;
101 
102 
109 struct ElementEntry
110 {
116 
122 
126  unsigned int generation_added;
127 
132  int remote;
133 };
134 
135 
140 struct Listener;
141 
142 
146 struct ClientState
147 {
151  struct Set *set;
152 
156  struct Listener *listener;
157 
162 
167 };
168 
169 
173 struct Operation
174 {
179  struct GNUNET_PeerIdentity peer;
180 
186  struct GNUNET_HashCode my_xor;
187 
193  struct GNUNET_HashCode other_xor;
194 
198  struct Operation *next;
199 
203  struct Operation *prev;
204 
209 
213  struct Listener *listener;
214 
218  struct GNUNET_MQ_Handle *mq;
219 
224 
229  struct Set *set;
230 
235 
240 
246 
251 
256  char *bf_data;
257 
263 
267  uint32_t bf_data_offset;
268 
274 
278  uint32_t bf_data_size;
279 
284 
289  uint32_t salt;
290 
295 
299  unsigned int generation_created;
300 
305 
312 
316  uint32_t remote_element_count;
317 
321  uint32_t client_request_id;
322 
327 
333  uint32_t suggest_id;
334 
335 };
336 
337 
342 struct SetContent
343 {
348 
352  unsigned int refcount;
353 
357  unsigned int latest_generation;
358 
362  int iterator_count;
363 };
364 
365 
369 struct Set
370 {
374  struct Set *next;
375 
379  struct Set *prev;
380 
385  struct ClientState *cs;
386 
391  struct SetContent *content;
392 
398 
402  struct Operation *ops_head;
403 
407  struct Operation *ops_tail;
408 
413  unsigned int current_generation;
414 
415 };
416 
417 
422 struct Listener
423 {
427  struct Listener *next;
428 
432  struct Listener *prev;
433 
439  struct Operation *op_head;
440 
446  struct Operation *op_tail;
447 
452  struct ClientState *cs;
453 
458 
463  struct GNUNET_HashCode app_id;
464 
465 };
466 
467 
472 static struct GNUNET_CADET_Handle *cadet;
473 
478 
482 static struct Listener *listener_head;
483 
487 static struct Listener *listener_tail;
488 
492 static unsigned int num_clients;
493 
498 static int in_shutdown;
499 
506 static uint32_t suggest_id;
507 
508 
516 static void
518  struct GNUNET_SETI_Element *element)
519 {
520  struct GNUNET_MQ_Envelope *ev;
521  struct GNUNET_SETI_ResultMessage *rm;
522 
523  if (GNUNET_YES == op->return_intersection)
524  {
525  GNUNET_break (0);
526  return; /* Wrong mode for transmitting removed elements */
527  }
529  "Sending removed element (size %u) to client\n",
530  element->size);
532  "# Element removed messages sent",
533  1,
534  GNUNET_NO);
535  GNUNET_assert (0 != op->client_request_id);
536  ev = GNUNET_MQ_msg_extra (rm,
537  element->size,
539  if (NULL == ev)
540  {
541  GNUNET_break (0);
542  return;
543  }
545  rm->request_id = htonl (op->client_request_id);
546  rm->element_type = element->element_type;
547  GNUNET_memcpy (&rm[1],
548  element->data,
549  element->size);
550  GNUNET_MQ_send (op->set->cs->mq,
551  ev);
552 }
553 
554 
562 static int
564  struct Operation *op)
565 {
566  return op->generation_created >= ee->generation_added;
567 }
568 
569 
578 static int
580  const struct GNUNET_HashCode *key,
581  void *value)
582 {
583  struct Operation *op = cls;
584  struct ElementEntry *ee = value;
585  struct GNUNET_HashCode mutated_hash;
586 
588  "FIMA called for %s:%u\n",
589  GNUNET_h2s (&ee->element_hash),
590  ee->element.size);
591 
593  {
595  "Reduced initialization, not starting with %s:%u (wrong generation)\n",
596  GNUNET_h2s (&ee->element_hash),
597  ee->element.size);
598  return GNUNET_YES; /* element not valid in our operation's generation */
599  }
600 
601  /* Test if element is in other peer's bloomfilter */
603  op->salt,
604  &mutated_hash);
606  "Testing mingled hash %s with salt %u\n",
607  GNUNET_h2s (&mutated_hash),
608  op->salt);
609  if (GNUNET_NO ==
611  &mutated_hash))
612  {
613  /* remove this element */
615  &ee->element);
617  "Reduced initialization, not starting with %s:%u\n",
618  GNUNET_h2s (&ee->element_hash),
619  ee->element.size);
620  return GNUNET_YES;
621  }
622  op->my_element_count++;
623  GNUNET_CRYPTO_hash_xor (&op->my_xor,
624  &ee->element_hash,
625  &op->my_xor);
627  "Filtered initialization of my_elements, adding %s:%u\n",
628  GNUNET_h2s (&ee->element_hash),
629  ee->element.size);
632  &ee->element_hash,
633  ee,
635 
636  return GNUNET_YES;
637 }
638 
639 
649 static int
651  const struct GNUNET_HashCode *key,
652  void *value)
653 {
654  struct Operation *op = cls;
655  struct ElementEntry *ee = value;
656  struct GNUNET_HashCode mutated_hash;
657 
659  op->salt,
660  &mutated_hash);
662  "Testing mingled hash %s with salt %u\n",
663  GNUNET_h2s (&mutated_hash),
664  op->salt);
665  if (GNUNET_NO ==
667  &mutated_hash))
668  {
669  GNUNET_break (0 < op->my_element_count);
670  op->my_element_count--;
671  GNUNET_CRYPTO_hash_xor (&op->my_xor,
672  &ee->element_hash,
673  &op->my_xor);
675  "Bloom filter reduction of my_elements, removing %s:%u\n",
676  GNUNET_h2s (&ee->element_hash),
677  ee->element.size);
680  &ee->element_hash,
681  ee));
683  &ee->element);
684  }
685  else
686  {
688  "Bloom filter reduction of my_elements, keeping %s:%u\n",
689  GNUNET_h2s (&ee->element_hash),
690  ee->element.size);
691  }
692  return GNUNET_YES;
693 }
694 
695 
704 static int
706  const struct GNUNET_HashCode *key,
707  void *value)
708 {
709  struct Operation *op = cls;
710  struct ElementEntry *ee = value;
711  struct GNUNET_HashCode mutated_hash;
712 
714  op->salt,
715  &mutated_hash);
717  "Initializing BF with hash %s with salt %u\n",
718  GNUNET_h2s (&mutated_hash),
719  op->salt);
721  &mutated_hash);
722  return GNUNET_YES;
723 }
724 
725 
738 static void
740 {
741  struct Set *set = op->set;
742  struct GNUNET_CADET_Channel *channel;
743 
744  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying operation %p\n", op);
745  GNUNET_assert (NULL == op->listener);
746  if (NULL != op->remote_bf)
747  {
749  op->remote_bf = NULL;
750  }
751  if (NULL != op->local_bf)
752  {
754  op->local_bf = NULL;
755  }
756  if (NULL != op->my_elements)
757  {
759  op->my_elements = NULL;
760  }
761  if (NULL != op->full_result_iter)
762  {
764  op->full_result_iter);
765  op->full_result_iter = NULL;
766  }
768  "Destroying intersection op state done\n");
769  if (NULL != set)
770  {
772  set->ops_tail,
773  op);
774  op->set = NULL;
775  }
776  if (NULL != op->context_msg)
777  {
778  GNUNET_free (op->context_msg);
779  op->context_msg = NULL;
780  }
781  if (NULL != (channel = op->channel))
782  {
783  /* This will free op; called conditionally as this helper function
784  is also called from within the channel disconnect handler. */
785  op->channel = NULL;
787  }
788  /* We rely on the channel end handler to free 'op'. When 'op->channel' was NULL,
789  * there was a channel end handler that will free 'op' on the call stack. */
790 }
791 
792 
798 static void
800 
801 
807 static void
809 {
810  struct Listener *listener;
811 
813  "Destroying incoming operation %p\n",
814  op);
815  if (NULL != (listener = op->listener))
816  {
818  listener->op_tail,
819  op);
820  op->listener = NULL;
821  }
822  if (NULL != op->timeout_task)
823  {
824  GNUNET_SCHEDULER_cancel (op->timeout_task);
825  op->timeout_task = NULL;
826  }
828 }
829 
830 
837 static void
839 {
840  struct Operation *op = cls;
841  struct GNUNET_MQ_Envelope *ev;
842  struct GNUNET_SETI_ResultMessage *rm;
843 
845  "Intersection succeeded, sending DONE to local client\n");
847  "# Intersection operations succeeded",
848  1,
849  GNUNET_NO);
850  ev = GNUNET_MQ_msg (rm,
852  rm->request_id = htonl (op->client_request_id);
854  rm->element_type = htons (0);
855  GNUNET_MQ_send (op->set->cs->mq,
856  ev);
858 }
859 
860 
866 static void
868 {
869  struct GNUNET_CADET_Channel *channel;
870 
872  "channel_end_cb called\n");
873  if (NULL != (channel = op->channel))
874  {
875  /* This will free op; called conditionally as this helper function
876  is also called from within the channel disconnect handler. */
877  op->channel = NULL;
879  }
880  if (NULL != op->listener)
881  {
883  return;
884  }
885  if (NULL != op->set)
886  {
887  if (GNUNET_YES == op->channel_death_expected)
888  {
889  /* oh goodie, we are done! */
891  }
892  else
893  {
894  /* sorry, channel went down early, too bad. */
896  }
897  }
898  else
900  GNUNET_free (op);
901 }
902 
903 
910 static void
912 {
913  struct GNUNET_MQ_Envelope *ev;
915 
917  "Intersection operation failed\n");
919  "# Intersection operations failed",
920  1,
921  GNUNET_NO);
922  if (NULL != op->my_elements)
923  {
925  op->my_elements = NULL;
926  }
927  ev = GNUNET_MQ_msg (msg,
929  msg->result_status = htons (GNUNET_SETI_STATUS_FAILURE);
930  msg->request_id = htonl (op->client_request_id);
931  msg->element_type = htons (0);
932  GNUNET_MQ_send (op->set->cs->mq,
933  ev);
935 }
936 
937 
944 static void
946 {
947  struct GNUNET_MQ_Envelope *ev;
948  struct BFMessage *msg;
949  uint32_t bf_size;
950  uint32_t bf_elementbits;
951  uint32_t chunk_size;
952  char *bf_data;
953  uint32_t offset;
954 
955  /* We consider the ratio of the set sizes to determine
956  the number of bits per element, as the smaller set
957  should use more bits to maximize its set reduction
958  potential and minimize overall bandwidth consumption. */
959  bf_elementbits = 2 + ceil (log2 ((double)
960  (op->remote_element_count
961  / (double) op->my_element_count)));
962  if (bf_elementbits < 1)
963  bf_elementbits = 1; /* make sure k is not 0 */
964  /* optimize BF-size to ~50% of bits set */
965  bf_size = ceil ((double) (op->my_element_count
966  * bf_elementbits / log (2)));
968  "Sending Bloom filter (%u) of size %u bytes\n",
969  (unsigned int) bf_elementbits,
970  (unsigned int) bf_size);
971  op->local_bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
972  bf_size,
973  bf_elementbits);
975  UINT32_MAX);
978  op);
979 
980  /* send our Bloom filter */
982  "# Intersection Bloom filters sent",
983  1,
984  GNUNET_NO);
985  chunk_size = 60 * 1024 - sizeof(struct BFMessage);
986  if (bf_size <= chunk_size)
987  {
988  /* singlepart */
989  chunk_size = bf_size;
990  ev = GNUNET_MQ_msg_extra (msg,
991  chunk_size,
995  op->local_bf,
996  (char *) &msg[1],
997  bf_size));
998  msg->sender_element_count = htonl (op->my_element_count);
999  msg->bloomfilter_total_length = htonl (bf_size);
1000  msg->bits_per_element = htonl (bf_elementbits);
1001  msg->sender_mutator = htonl (op->salt);
1002  msg->element_xor_hash = op->my_xor;
1003  GNUNET_MQ_send (op->mq, ev);
1004  }
1005  else
1006  {
1007  /* multipart */
1008  bf_data = GNUNET_malloc (bf_size);
1011  op->local_bf,
1012  bf_data,
1013  bf_size));
1014  offset = 0;
1015  while (offset < bf_size)
1016  {
1017  if (bf_size - chunk_size < offset)
1018  chunk_size = bf_size - offset;
1019  ev = GNUNET_MQ_msg_extra (msg,
1020  chunk_size,
1022  GNUNET_memcpy (&msg[1],
1023  &bf_data[offset],
1024  chunk_size);
1025  offset += chunk_size;
1026  msg->sender_element_count = htonl (op->my_element_count);
1027  msg->bloomfilter_total_length = htonl (bf_size);
1028  msg->bits_per_element = htonl (bf_elementbits);
1029  msg->sender_mutator = htonl (op->salt);
1030  msg->element_xor_hash = op->my_xor;
1031  GNUNET_MQ_send (op->mq, ev);
1032  }
1033  GNUNET_free (bf_data);
1034  }
1036  op->local_bf = NULL;
1037 }
1038 
1039 
1048 static void
1050 {
1051  struct Operation *op = cls;
1052 
1054  "DONE sent to other peer, now waiting for other end to close the channel\n");
1055  op->phase = PHASE_FINISHED;
1056  op->channel_death_expected = GNUNET_YES;
1057 }
1058 
1059 
1067 static void
1069 {
1070  struct GNUNET_MQ_Envelope *ev;
1071  struct IntersectionDoneMessage *idm;
1072 
1074  GNUNET_assert (GNUNET_NO == op->channel_death_expected);
1075  ev = GNUNET_MQ_msg (idm,
1077  idm->final_element_count = htonl (op->my_element_count);
1078  idm->element_xor_hash = op->my_xor;
1081  op);
1082  GNUNET_MQ_send (op->mq,
1083  ev);
1084 }
1085 
1086 
1092 static void
1094 {
1095  struct Operation *op = cls;
1096  const void *nxt;
1097  const struct ElementEntry *ee;
1098  struct GNUNET_MQ_Envelope *ev;
1099  struct GNUNET_SETI_ResultMessage *rm;
1100  const struct GNUNET_SETI_Element *element;
1101  int res;
1102 
1103  if (GNUNET_NO == op->return_intersection)
1104  {
1105  GNUNET_break (0);
1106  return; /* Wrong mode for transmitting removed elements */
1107  }
1109  op->full_result_iter,
1110  NULL,
1111  &nxt);
1112  if (GNUNET_NO == res)
1113  {
1115  "Sending done and destroy because iterator ran out\n");
1117  op->full_result_iter);
1118  op->full_result_iter = NULL;
1119  if (PHASE_DONE_RECEIVED == op->phase)
1120  {
1121  op->phase = PHASE_FINISHED;
1123  }
1124  else if (PHASE_MUST_SEND_DONE == op->phase)
1125  {
1126  send_p2p_done (op);
1127  }
1128  else
1129  {
1130  GNUNET_assert (0);
1131  }
1132  return;
1133  }
1134  ee = nxt;
1135  element = &ee->element;
1137  "Sending element %s:%u to client (full set)\n",
1138  GNUNET_h2s (&ee->element_hash),
1139  element->size);
1140  GNUNET_assert (0 != op->client_request_id);
1141  ev = GNUNET_MQ_msg_extra (rm,
1142  element->size,
1144  GNUNET_assert (NULL != ev);
1146  rm->request_id = htonl (op->client_request_id);
1147  rm->element_type = element->element_type;
1148  GNUNET_memcpy (&rm[1],
1149  element->data,
1150  element->size);
1153  op);
1154  GNUNET_MQ_send (op->set->cs->mq,
1155  ev);
1156 }
1157 
1158 
1168 static int
1170  const struct GNUNET_HashCode *key,
1171  void *value)
1172 {
1173  struct ElementEntry *ee = value;
1174  struct Operation *op = cls;
1175 
1177  return GNUNET_YES; /* element not live in operation's generation */
1178  GNUNET_CRYPTO_hash_xor (&op->my_xor,
1179  &ee->element_hash,
1180  &op->my_xor);
1182  "Initial full initialization of my_elements, adding %s:%u\n",
1183  GNUNET_h2s (&ee->element_hash),
1184  ee->element.size);
1186  GNUNET_CONTAINER_multihashmap_put (op->my_elements,
1187  &ee->element_hash,
1188  ee,
1190  return GNUNET_YES;
1191 }
1192 
1193 
1200 static void
1202 {
1203  struct GNUNET_MQ_Envelope *ev;
1205 
1207  "Sending our element count (%u)\n",
1208  op->my_element_count);
1209  ev = GNUNET_MQ_msg (msg,
1211  msg->sender_element_count = htonl (op->my_element_count);
1212  GNUNET_MQ_send (op->mq, ev);
1213 }
1214 
1215 
1222 static void
1224 {
1225  op->phase = PHASE_BF_EXCHANGE;
1226  GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1228  op);
1229  send_bloomfilter (op);
1230 }
1231 
1232 
1240 static void
1242  const struct
1244 {
1245  struct Operation *op = cls;
1246 
1247  op->remote_element_count = ntohl (msg->sender_element_count);
1249  "Received remote element count (%u), I have %u\n",
1250  op->remote_element_count,
1251  op->my_element_count);
1252  if (((PHASE_INITIAL != op->phase) &&
1253  (PHASE_COUNT_SENT != op->phase)) ||
1254  (op->my_element_count > op->remote_element_count) ||
1255  (0 == op->my_element_count) ||
1256  (0 == op->remote_element_count))
1257  {
1258  GNUNET_break_op (0);
1260  return;
1261  }
1262  GNUNET_break (NULL == op->remote_bf);
1264  GNUNET_CADET_receive_done (op->channel);
1265 }
1266 
1267 
1273 static void
1275 {
1277  "Received BF in phase %u, foreign count is %u, my element count is %u/%u\n",
1278  op->phase,
1279  op->remote_element_count,
1280  op->my_element_count,
1281  GNUNET_CONTAINER_multihashmap_size (op->set->content->elements));
1282  switch (op->phase)
1283  {
1284  case PHASE_INITIAL:
1285  GNUNET_break_op (0);
1287  return;
1288  case PHASE_COUNT_SENT:
1289  /* This is the first BF being sent, build our initial map with
1290  filtering in place */
1291  op->my_element_count = 0;
1292  GNUNET_CONTAINER_multihashmap_iterate (op->set->content->elements,
1294  op);
1295  break;
1296  case PHASE_BF_EXCHANGE:
1297  /* Update our set by reduction */
1300  op);
1301  break;
1302  case PHASE_MUST_SEND_DONE:
1303  GNUNET_break_op (0);
1305  return;
1306  case PHASE_DONE_RECEIVED:
1307  GNUNET_break_op (0);
1309  return;
1310  case PHASE_FINISHED:
1311  GNUNET_break_op (0);
1313  return;
1314  }
1316  op->remote_bf = NULL;
1317 
1318  if ((0 == op->my_element_count) || /* fully disjoint */
1319  ((op->my_element_count == op->remote_element_count) &&
1320  (0 == GNUNET_memcmp (&op->my_xor,
1321  &op->other_xor))))
1322  {
1323  /* we are done */
1324  op->phase = PHASE_MUST_SEND_DONE;
1326  "Intersection succeeded, sending DONE to other peer\n");
1328  op->local_bf = NULL;
1329  if (GNUNET_YES == op->return_intersection)
1330  {
1332  "Sending full result set (%u elements)\n",
1333  GNUNET_CONTAINER_multihashmap_size (op->my_elements));
1334  op->full_result_iter
1336  op->my_elements);
1338  return;
1339  }
1340  send_p2p_done (op);
1341  return;
1342  }
1343  op->phase = PHASE_BF_EXCHANGE;
1344  send_bloomfilter (op);
1345 }
1346 
1347 
1355 static int
1357  const struct BFMessage *msg)
1358 {
1359  struct Operation *op = cls;
1360 
1361  (void) op;
1362  return GNUNET_OK;
1363 }
1364 
1365 
1372 static void
1374  const struct BFMessage *msg)
1375 {
1376  struct Operation *op = cls;
1377  uint32_t bf_size;
1378  uint32_t chunk_size;
1379  uint32_t bf_bits_per_element;
1380 
1381  switch (op->phase)
1382  {
1383  case PHASE_INITIAL:
1384  GNUNET_break_op (0);
1386  return;
1387 
1388  case PHASE_COUNT_SENT:
1389  case PHASE_BF_EXCHANGE:
1390  bf_size = ntohl (msg->bloomfilter_total_length);
1391  bf_bits_per_element = ntohl (msg->bits_per_element);
1392  chunk_size = htons (msg->header.size) - sizeof(struct BFMessage);
1393  op->other_xor = msg->element_xor_hash;
1394  if (bf_size == chunk_size)
1395  {
1396  if (NULL != op->bf_data)
1397  {
1398  GNUNET_break_op (0);
1400  return;
1401  }
1402  /* single part, done here immediately */
1403  op->remote_bf
1404  = GNUNET_CONTAINER_bloomfilter_init ((const char *) &msg[1],
1405  bf_size,
1406  bf_bits_per_element);
1407  op->salt = ntohl (msg->sender_mutator);
1408  op->remote_element_count = ntohl (msg->sender_element_count);
1409  process_bf (op);
1410  break;
1411  }
1412  /* multipart chunk */
1413  if (NULL == op->bf_data)
1414  {
1415  /* first chunk, initialize */
1416  op->bf_data = GNUNET_malloc (bf_size);
1417  op->bf_data_size = bf_size;
1418  op->bf_bits_per_element = bf_bits_per_element;
1419  op->bf_data_offset = 0;
1420  op->salt = ntohl (msg->sender_mutator);
1421  op->remote_element_count = ntohl (msg->sender_element_count);
1422  }
1423  else
1424  {
1425  /* increment */
1426  if ((op->bf_data_size != bf_size) ||
1427  (op->bf_bits_per_element != bf_bits_per_element) ||
1428  (op->bf_data_offset + chunk_size > bf_size) ||
1429  (op->salt != ntohl (msg->sender_mutator)) ||
1430  (op->remote_element_count != ntohl (msg->sender_element_count)))
1431  {
1432  GNUNET_break_op (0);
1434  return;
1435  }
1436  }
1437  GNUNET_memcpy (&op->bf_data[op->bf_data_offset],
1438  (const char *) &msg[1],
1439  chunk_size);
1440  op->bf_data_offset += chunk_size;
1441  if (op->bf_data_offset == bf_size)
1442  {
1443  /* last chunk, run! */
1444  op->remote_bf
1446  bf_size,
1447  bf_bits_per_element);
1448  GNUNET_free (op->bf_data);
1449  op->bf_data = NULL;
1450  op->bf_data_size = 0;
1451  process_bf (op);
1452  }
1453  break;
1454 
1455  default:
1456  GNUNET_break_op (0);
1458  return;
1459  }
1460  GNUNET_CADET_receive_done (op->channel);
1461 }
1462 
1463 
1472 static int
1473 filter_all (void *cls,
1474  const struct GNUNET_HashCode *key,
1475  void *value)
1476 {
1477  struct Operation *op = cls;
1478  struct ElementEntry *ee = value;
1479 
1480  GNUNET_break (0 < op->my_element_count);
1481  op->my_element_count--;
1482  GNUNET_CRYPTO_hash_xor (&op->my_xor,
1483  &ee->element_hash,
1484  &op->my_xor);
1486  "Final reduction of my_elements, removing %s:%u\n",
1487  GNUNET_h2s (&ee->element_hash),
1488  ee->element.size);
1491  &ee->element_hash,
1492  ee));
1494  &ee->element);
1495  return GNUNET_YES;
1496 }
1497 
1498 
1505 static void
1507  const struct IntersectionDoneMessage *idm)
1508 {
1509  struct Operation *op = cls;
1510 
1511  if (PHASE_BF_EXCHANGE != op->phase)
1512  {
1513  /* wrong phase to conclude? FIXME: Or should we allow this
1514  if the other peer has _initially_ already an empty set? */
1515  GNUNET_break_op (0);
1517  return;
1518  }
1519  if (0 == ntohl (idm->final_element_count))
1520  {
1521  /* other peer determined empty set is the intersection,
1522  remove all elements */
1524  &filter_all,
1525  op);
1526  }
1527  if ((op->my_element_count != ntohl (idm->final_element_count)) ||
1528  (0 != GNUNET_memcmp (&op->my_xor,
1529  &idm->element_xor_hash)))
1530  {
1531  /* Other peer thinks we are done, but we disagree on the result! */
1532  GNUNET_break_op (0);
1534  return;
1535  }
1537  "Got IntersectionDoneMessage, have %u elements in intersection\n",
1538  op->my_element_count);
1539  op->phase = PHASE_DONE_RECEIVED;
1540  GNUNET_CADET_receive_done (op->channel);
1541 
1542  GNUNET_assert (GNUNET_NO == op->client_done_sent);
1543  if (GNUNET_YES == op->return_intersection)
1544  {
1546  "Sending full result set to client (%u elements)\n",
1547  GNUNET_CONTAINER_multihashmap_size (op->my_elements));
1548  op->full_result_iter
1551  return;
1552  }
1553  op->phase = PHASE_FINISHED;
1555 }
1556 
1557 
1566 static struct Operation *
1567 get_incoming (uint32_t id)
1568 {
1569  for (struct Listener *listener = listener_head; NULL != listener;
1570  listener = listener->next)
1571  {
1572  for (struct Operation *op = listener->op_head; NULL != op; op = op->next)
1573  if (op->suggest_id == id)
1574  return op;
1575  }
1576  return NULL;
1577 }
1578 
1579 
1588 static void *
1590  struct GNUNET_SERVICE_Client *c,
1591  struct GNUNET_MQ_Handle *mq)
1592 {
1593  struct ClientState *cs;
1594 
1595  num_clients++;
1596  cs = GNUNET_new (struct ClientState);
1597  cs->client = c;
1598  cs->mq = mq;
1599  return cs;
1600 }
1601 
1602 
1611 static int
1613  const struct GNUNET_HashCode *key,
1614  void *value)
1615 {
1616  struct ElementEntry *ee = value;
1617 
1618  GNUNET_free (ee);
1619  return GNUNET_YES;
1620 }
1621 
1622 
1630 static void
1632  struct GNUNET_SERVICE_Client *client,
1633  void *internal_cls)
1634 {
1635  struct ClientState *cs = internal_cls;
1636  struct Operation *op;
1637  struct Listener *listener;
1638  struct Set *set;
1639 
1640  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client disconnected, cleaning up\n");
1641  if (NULL != (set = cs->set))
1642  {
1643  struct SetContent *content = set->content;
1644 
1645  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's set\n");
1646  /* Destroy pending set operations */
1647  while (NULL != set->ops_head)
1649 
1650  /* free set content (or at least decrement RC) */
1651  set->content = NULL;
1652  GNUNET_assert (0 != content->refcount);
1653  content->refcount--;
1654  if (0 == content->refcount)
1655  {
1656  GNUNET_assert (NULL != content->elements);
1659  NULL);
1661  content->elements = NULL;
1662  GNUNET_free (content);
1663  }
1664  GNUNET_free (set);
1665  }
1666 
1667  if (NULL != (listener = cs->listener))
1668  {
1669  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Destroying client's listener\n");
1670  GNUNET_CADET_close_port (listener->open_port);
1671  listener->open_port = NULL;
1672  while (NULL != (op = listener->op_head))
1673  {
1675  "Destroying incoming operation `%u' from peer `%s'\n",
1676  (unsigned int) op->client_request_id,
1677  GNUNET_i2s (&op->peer));
1678  incoming_destroy (op);
1679  }
1681  GNUNET_free (listener);
1682  }
1683  GNUNET_free (cs);
1684  num_clients--;
1685  if ((GNUNET_YES == in_shutdown) && (0 == num_clients))
1686  {
1687  if (NULL != cadet)
1688  {
1690  cadet = NULL;
1691  }
1692  }
1693 }
1694 
1695 
1704 static int
1706  const struct OperationRequestMessage *msg)
1707 {
1708  struct Operation *op = cls;
1709  struct Listener *listener = op->listener;
1710  const struct GNUNET_MessageHeader *nested_context;
1711 
1712  /* double operation request */
1713  if (0 != op->suggest_id)
1714  {
1715  GNUNET_break_op (0);
1716  return GNUNET_SYSERR;
1717  }
1718  /* This should be equivalent to the previous condition, but can't hurt to check twice */
1719  if (NULL == listener)
1720  {
1721  GNUNET_break (0);
1722  return GNUNET_SYSERR;
1723  }
1724  nested_context = GNUNET_MQ_extract_nested_mh (msg);
1725  if ((NULL != nested_context) &&
1726  (ntohs (nested_context->size) > GNUNET_SETI_CONTEXT_MESSAGE_MAX_SIZE))
1727  {
1728  GNUNET_break_op (0);
1729  return GNUNET_SYSERR;
1730  }
1731  return GNUNET_OK;
1732 }
1733 
1734 
1752 static void
1754  const struct OperationRequestMessage *msg)
1755 {
1756  struct Operation *op = cls;
1757  struct Listener *listener = op->listener;
1758  const struct GNUNET_MessageHeader *nested_context;
1759  struct GNUNET_MQ_Envelope *env;
1760  struct GNUNET_SETI_RequestMessage *cmsg;
1761 
1762  nested_context = GNUNET_MQ_extract_nested_mh (msg);
1763  /* Make a copy of the nested_context (application-specific context
1764  information that is opaque to set) so we can pass it to the
1765  listener later on */
1766  if (NULL != nested_context)
1767  op->context_msg = GNUNET_copy_message (nested_context);
1768  op->remote_element_count = ntohl (msg->element_count);
1769  GNUNET_log (
1771  "Received P2P operation request (port %s) for active listener\n",
1772  GNUNET_h2s (&op->listener->app_id));
1773  GNUNET_assert (0 == op->suggest_id);
1774  if (0 == suggest_id)
1775  suggest_id++;
1776  op->suggest_id = suggest_id++;
1777  GNUNET_assert (NULL != op->timeout_task);
1778  GNUNET_SCHEDULER_cancel (op->timeout_task);
1779  op->timeout_task = NULL;
1780  env = GNUNET_MQ_msg_nested_mh (cmsg,
1782  op->context_msg);
1783  GNUNET_log (
1785  "Suggesting incoming request with accept id %u to listener %p of client %p\n",
1786  op->suggest_id,
1787  listener,
1788  listener->cs);
1789  cmsg->accept_id = htonl (op->suggest_id);
1790  cmsg->peer_id = op->peer;
1791  GNUNET_MQ_send (listener->cs->mq, env);
1792  /* NOTE: GNUNET_CADET_receive_done() will be called in
1793  #handle_client_accept() */
1794 }
1795 
1796 
1805 static void
1807  const struct GNUNET_SETI_CreateMessage *msg)
1808 {
1809  struct ClientState *cs = cls;
1810  struct Set *set;
1811 
1813  "Client created new intersection set\n");
1814  if (NULL != cs->set)
1815  {
1816  /* There can only be one set per client */
1817  GNUNET_break (0);
1819  return;
1820  }
1821  set = GNUNET_new (struct Set);
1822  set->content = GNUNET_new (struct SetContent);
1823  set->content->refcount = 1;
1825  GNUNET_YES);
1826  set->cs = cs;
1827  cs->set = set;
1829 }
1830 
1831 
1841 static void
1843 {
1844  struct Operation *op = cls;
1845 
1846  op->timeout_task = NULL;
1848  "Remote peer's incoming request timed out\n");
1849  incoming_destroy (op);
1850 }
1851 
1852 
1869 static void *
1870 channel_new_cb (void *cls,
1871  struct GNUNET_CADET_Channel *channel,
1872  const struct GNUNET_PeerIdentity *source)
1873 {
1874  struct Listener *listener = cls;
1875  struct Operation *op;
1876 
1878  "New incoming channel\n");
1879  op = GNUNET_new (struct Operation);
1880  op->listener = listener;
1881  op->peer = *source;
1882  op->channel = channel;
1883  op->mq = GNUNET_CADET_get_mq (op->channel);
1885  UINT32_MAX);
1888  op);
1890  listener->op_tail,
1891  op);
1892  return op;
1893 }
1894 
1895 
1912 static void
1913 channel_end_cb (void *channel_ctx,
1914  const struct GNUNET_CADET_Channel *channel)
1915 {
1916  struct Operation *op = channel_ctx;
1917 
1918  op->channel = NULL;
1920 }
1921 
1922 
1937 static void
1939  const struct GNUNET_CADET_Channel *channel,
1940  int window_size)
1941 {
1942  /* FIXME: not implemented, we could do flow control here... */
1943 }
1944 
1945 
1952 static void
1954  const struct GNUNET_SETI_ListenMessage *msg)
1955 {
1956  struct ClientState *cs = cls;
1957  struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
1958  GNUNET_MQ_hd_var_size (incoming_msg,
1960  struct OperationRequestMessage,
1961  NULL),
1962  GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
1965  NULL),
1966  GNUNET_MQ_hd_var_size (intersection_p2p_bf,
1968  struct BFMessage,
1969  NULL),
1970  GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
1972  struct IntersectionDoneMessage,
1973  NULL),
1975  };
1976  struct Listener *listener;
1977 
1978  if (NULL != cs->listener)
1979  {
1980  /* max. one active listener per client! */
1981  GNUNET_break (0);
1983  return;
1984  }
1985  listener = GNUNET_new (struct Listener);
1986  listener->cs = cs;
1987  cs->listener = listener;
1988  listener->app_id = msg->app_id;
1990  listener_tail,
1991  listener);
1993  "New listener for set intersection created (port %s)\n",
1994  GNUNET_h2s (&listener->app_id));
1995  listener->open_port = GNUNET_CADET_open_port (cadet,
1996  &msg->app_id,
1997  &channel_new_cb,
1998  listener,
2000  &channel_end_cb,
2001  cadet_handlers);
2003 }
2004 
2005 
2013 static void
2015  const struct GNUNET_SETI_RejectMessage *msg)
2016 {
2017  struct ClientState *cs = cls;
2018  struct Operation *op;
2019 
2020  op = get_incoming (ntohl (msg->accept_reject_id));
2021  if (NULL == op)
2022  {
2023  /* no matching incoming operation for this reject;
2024  could be that the other peer already disconnected... */
2026  "Client rejected unknown operation %u\n",
2027  (unsigned int) ntohl (msg->accept_reject_id));
2029  return;
2030  }
2032  "Peer request (app %s) rejected by client\n",
2033  GNUNET_h2s (&cs->listener->app_id));
2036 }
2037 
2038 
2045 static int
2047  const struct GNUNET_SETI_ElementMessage *msg)
2048 {
2049  /* NOTE: Technically, we should probably check with the
2050  block library whether the element we are given is well-formed */
2051  return GNUNET_OK;
2052 }
2053 
2054 
2061 static void
2063  const struct GNUNET_SETI_ElementMessage *msg)
2064 {
2065  struct ClientState *cs = cls;
2066  struct Set *set;
2067  struct GNUNET_SETI_Element el;
2068  struct ElementEntry *ee;
2069  struct GNUNET_HashCode hash;
2070 
2071  if (NULL == (set = cs->set))
2072  {
2073  /* client without a set requested an operation */
2074  GNUNET_break (0);
2076  return;
2077  }
2079  el.size = ntohs (msg->header.size) - sizeof(*msg);
2080  el.data = &msg[1];
2081  el.element_type = ntohs (msg->element_type);
2083  &hash);
2085  &hash);
2086  if (NULL == ee)
2087  {
2089  "Client inserts element %s of size %u\n",
2090  GNUNET_h2s (&hash),
2091  el.size);
2092  ee = GNUNET_malloc (el.size + sizeof(*ee));
2093  ee->element.size = el.size;
2094  GNUNET_memcpy (&ee[1], el.data, el.size);
2095  ee->element.data = &ee[1];
2096  ee->element.element_type = el.element_type;
2097  ee->remote = GNUNET_NO;
2098  ee->element_hash = hash;
2101  set->content->elements,
2102  &ee->element_hash,
2103  ee,
2105  }
2106  else
2107  {
2109  "Client inserted element %s of size %u twice (ignored)\n",
2110  GNUNET_h2s (&hash),
2111  el.size);
2112  /* same element inserted twice */
2113  return;
2114  }
2116 }
2117 
2118 
2125 static void
2127 {
2128  if (set->current_generation == set->content->latest_generation)
2129  {
2130  set->content->latest_generation++;
2131  set->current_generation++;
2132  return;
2133  }
2135 }
2136 
2137 
2147 static int
2149  const struct GNUNET_SETI_EvaluateMessage *msg)
2150 {
2151  /* FIXME: suboptimal, even if the context below could be NULL,
2152  there are malformed messages this does not check for... */
2153  return GNUNET_OK;
2154 }
2155 
2156 
2165 static void
2167  const struct GNUNET_SETI_EvaluateMessage *msg)
2168 {
2169  struct ClientState *cs = cls;
2170  struct Operation *op = GNUNET_new (struct Operation);
2171  const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
2172  GNUNET_MQ_hd_var_size (incoming_msg,
2174  struct OperationRequestMessage,
2175  op),
2176  GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
2179  op),
2180  GNUNET_MQ_hd_var_size (intersection_p2p_bf,
2182  struct BFMessage,
2183  op),
2184  GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
2186  struct IntersectionDoneMessage,
2187  op),
2189  };
2190  struct Set *set;
2191  const struct GNUNET_MessageHeader *context;
2192 
2193  if (NULL == (set = cs->set))
2194  {
2195  GNUNET_break (0);
2196  GNUNET_free (op);
2198  return;
2199  }
2201  UINT32_MAX);
2202  op->peer = msg->target_peer;
2203  op->return_intersection = htonl (msg->return_intersection);
2204  fprintf (stderr,
2205  "Return intersection for evaluate is %d\n",
2206  op->return_intersection);
2207  op->client_request_id = ntohl (msg->request_id);
2209 
2210  /* Advance generation values, so that
2211  mutations won't interfer with the running operation. */
2212  op->set = set;
2213  op->generation_created = set->current_generation;
2214  advance_generation (set);
2216  set->ops_tail,
2217  op);
2219  "Creating new CADET channel to port %s for set intersection\n",
2220  GNUNET_h2s (&msg->app_id));
2221  op->channel = GNUNET_CADET_channel_create (cadet,
2222  op,
2223  &msg->target_peer,
2224  &msg->app_id,
2226  &channel_end_cb,
2227  cadet_handlers);
2228  op->mq = GNUNET_CADET_get_mq (op->channel);
2229  {
2230  struct GNUNET_MQ_Envelope *ev;
2231  struct OperationRequestMessage *msg;
2232 
2235  context);
2236  if (NULL == ev)
2237  {
2238  /* the context message is too large!? */
2239  GNUNET_break (0);
2241  return;
2242  }
2244  "Initiating intersection operation evaluation\n");
2245  /* we started the operation, thus we have to send the operation request */
2246  op->phase = PHASE_INITIAL;
2247  op->my_element_count = op->set->current_set_element_count;
2248  op->my_elements
2249  = GNUNET_CONTAINER_multihashmap_create (op->my_element_count,
2250  GNUNET_YES);
2251 
2252  msg->element_count = htonl (op->my_element_count);
2253  GNUNET_MQ_send (op->mq,
2254  ev);
2255  op->phase = PHASE_COUNT_SENT;
2256  if (NULL != context)
2258  "Sent op request with context message\n");
2259  else
2261  "Sent op request without context message\n");
2262  }
2264 }
2265 
2266 
2273 static void
2275  const struct GNUNET_SETI_CancelMessage *msg)
2276 {
2277  struct ClientState *cs = cls;
2278  struct Set *set;
2279  struct Operation *op;
2280  int found;
2281 
2282  if (NULL == (set = cs->set))
2283  {
2284  /* client without a set requested an operation */
2285  GNUNET_break (0);
2287  return;
2288  }
2289  found = GNUNET_NO;
2290  for (op = set->ops_head; NULL != op; op = op->next)
2291  {
2292  if (op->client_request_id == ntohl (msg->request_id))
2293  {
2294  found = GNUNET_YES;
2295  break;
2296  }
2297  }
2298  if (GNUNET_NO == found)
2299  {
2300  /* It may happen that the operation was already destroyed due to
2301  * the other peer disconnecting. The client may not know about this
2302  * yet and try to cancel the (just barely non-existent) operation.
2303  * So this is not a hard error.
2304  *///
2306  "Client canceled non-existent op %u\n",
2307  (uint32_t) ntohl (msg->request_id));
2308  }
2309  else
2310  {
2312  "Client requested cancel for op %u\n",
2313  (uint32_t) ntohl (msg->request_id));
2315  }
2317 }
2318 
2319 
2328 static void
2330  const struct GNUNET_SETI_AcceptMessage *msg)
2331 {
2332  struct ClientState *cs = cls;
2333  struct Set *set;
2334  struct Operation *op;
2335  struct GNUNET_SETI_ResultMessage *result_message;
2336  struct GNUNET_MQ_Envelope *ev;
2337  struct Listener *listener;
2338 
2339  if (NULL == (set = cs->set))
2340  {
2341  /* client without a set requested to accept */
2342  GNUNET_break (0);
2344  return;
2345  }
2346  op = get_incoming (ntohl (msg->accept_reject_id));
2347  if (NULL == op)
2348  {
2349  /* It is not an error if the set op does not exist -- it may
2350  * have been destroyed when the partner peer disconnected. */
2351  GNUNET_log (
2353  "Client %p accepted request %u of listener %p that is no longer active\n",
2354  cs,
2355  ntohl (msg->accept_reject_id),
2356  cs->listener);
2357  ev = GNUNET_MQ_msg (result_message,
2359  result_message->request_id = msg->request_id;
2360  result_message->result_status = htons (GNUNET_SETI_STATUS_FAILURE);
2361  GNUNET_MQ_send (set->cs->mq, ev);
2363  return;
2364  }
2366  "Client accepting request %u\n",
2367  (uint32_t) ntohl (msg->accept_reject_id));
2368  listener = op->listener;
2369  op->listener = NULL;
2370  op->return_intersection = htonl (msg->return_intersection);
2371  fprintf (stderr,
2372  "Return intersection for accept is %d\n",
2373  op->return_intersection);
2375  listener->op_tail,
2376  op);
2377  op->set = set;
2379  set->ops_tail,
2380  op);
2381  op->client_request_id = ntohl (msg->request_id);
2382 
2383  /* Advance generation values, so that future mutations do not
2384  interfer with the running operation. */
2385  op->generation_created = set->current_generation;
2386  advance_generation (set);
2387  {
2389  "Accepting set intersection operation\n");
2390  op->phase = PHASE_INITIAL;
2391  op->my_element_count
2392  = op->set->current_set_element_count;
2393  op->my_elements
2395  GNUNET_MIN (op->my_element_count,
2396  op->remote_element_count),
2397  GNUNET_YES);
2398  if (op->remote_element_count < op->my_element_count)
2399  {
2400  /* If the other peer (Alice) has fewer elements than us (Bob),
2401  we just send the count as Alice should send the first BF */
2403  op->phase = PHASE_COUNT_SENT;
2404  }
2405  else
2406  {
2407  /* We have fewer elements, so we start with the BF */
2409  }
2410  }
2411  /* Now allow CADET to continue, as we did not do this in
2412  #handle_incoming_msg (as we wanted to first see if the
2413  local client would accept the request). */
2414  GNUNET_CADET_receive_done (op->channel);
2416 }
2417 
2418 
2424 static void
2425 shutdown_task (void *cls)
2426 {
2427  /* Delay actual shutdown to allow service to disconnect clients */
2429  if (0 == num_clients)
2430  {
2431  if (NULL != cadet)
2432  {
2434  cadet = NULL;
2435  }
2436  }
2438  GNUNET_YES);
2440  "handled shutdown request\n");
2441 }
2442 
2443 
2452 static void
2453 run (void *cls,
2454  const struct GNUNET_CONFIGURATION_Handle *cfg,
2456 {
2457  /* FIXME: need to modify SERVICE (!) API to allow
2458  us to run a shutdown task *after* clients were
2459  forcefully disconnected! */
2461  NULL);
2463  cfg);
2465  if (NULL == cadet)
2466  {
2468  _ ("Could not connect to CADET service\n"));
2470  return;
2471  }
2472 }
2473 
2474 
2479  "seti",
2481  &run,
2484  NULL,
2485  GNUNET_MQ_hd_fixed_size (client_accept,
2488  NULL),
2489  GNUNET_MQ_hd_var_size (client_set_add,
2492  NULL),
2493  GNUNET_MQ_hd_fixed_size (client_create_set,
2496  NULL),
2497  GNUNET_MQ_hd_var_size (client_evaluate,
2500  NULL),
2501  GNUNET_MQ_hd_fixed_size (client_listen,
2504  NULL),
2505  GNUNET_MQ_hd_fixed_size (client_reject,
2508  NULL),
2509  GNUNET_MQ_hd_fixed_size (client_cancel,
2512  NULL),
2514 
2515 
2516 /* end of gnunet-service-seti.c */
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
static const struct GNUNET_CONFIGURATION_Handle * cfg
Configuration we are using.
Definition: gnunet-abd.c:36
static struct GNUNET_IDENTITY_EgoLookup * el
EgoLookup.
Definition: gnunet-abd.c:51
static struct GNUNET_ARM_Operation * op
Current operation.
Definition: gnunet-arm.c:144
static int res
struct GNUNET_HashCode key
The key used in the DHT.
static GstElement * source
Appsrc instance into which we write data for the pipeline.
static pa_context * context
Pulseaudio context.
static char * value
Value of the record to add/remove.
static struct GNUNET_SERVICE_Handle * service
Handle to our service instance.
IntersectionOperationPhase
Current phase we are in for a intersection operation.
static void _GSS_operation_destroy(struct Operation *op)
Destroy the given operation.
static void _GSS_operation_destroy2(struct Operation *op)
This function probably should not exist and be replaced by inlining more specific logic in the variou...
static int _GSS_is_element_of_operation(struct ElementEntry *ee, struct Operation *op)
Is element ee part of the set used by op?
static int check_client_evaluate(void *cls, const struct GNUNET_SETI_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
static void handle_intersection_p2p_bf(void *cls, const struct BFMessage *msg)
Handle an BF message from a remote peer.
static uint32_t suggest_id
Counter for allocating unique IDs for clients, used to identify incoming operation requests from remo...
static int check_intersection_p2p_bf(void *cls, const struct BFMessage *msg)
Check an BF message from a remote peer.
@ PHASE_INITIAL
We are just starting.
@ PHASE_DONE_RECEIVED
We have received the P2P DONE message, and must finish with the local client before terminating the c...
@ PHASE_FINISHED
The protocol is over.
@ PHASE_BF_EXCHANGE
We have initialized our set and are now reducing it by exchanging Bloom filters until one party notic...
@ PHASE_COUNT_SENT
We have send the number of our elements to the other peer, but did not setup our element set yet.
@ PHASE_MUST_SEND_DONE
We must next send the P2P DONE message (after finishing mostly with the local client).
static struct Listener * listener_head
Listeners are held in a doubly linked list.
static int iterator_bf_reduce(void *cls, const struct GNUNET_HashCode *key, void *value)
Removes elements from our hashmap if they are not contained within the provided remote bloomfilter.
static void handle_client_evaluate(void *cls, const struct GNUNET_SETI_EvaluateMessage *msg)
Called when a client wants to initiate a set operation with another peer.
static void handle_client_listen(void *cls, const struct GNUNET_SETI_ListenMessage *msg)
Called when a client wants to create a new listener.
static int check_client_set_add(void *cls, const struct GNUNET_SETI_ElementMessage *msg)
Called when a client wants to add or remove an element to a set it inhabits.
static void begin_bf_exchange(struct Operation *op)
We go first, initialize our map with all elements and send the first Bloom filter.
static struct GNUNET_CADET_Handle * cadet
Handle to the cadet service, used to listen for and connect to remote peers.
static int filter_all(void *cls, const struct GNUNET_HashCode *key, void *value)
Remove all elements from our hashmap.
static int filtered_map_initialization(void *cls, const struct GNUNET_HashCode *key, void *value)
Fills the "my_elements" hashmap with all relevant elements.
static void send_p2p_done(struct Operation *op)
Notify the other peer that we are done.
static void send_remaining_elements(void *cls)
Send all elements in the full result iterator.
static void handle_client_reject(void *cls, const struct GNUNET_SETI_RejectMessage *msg)
Called when the listening client rejects an operation request by another peer.
static void * client_connect_cb(void *cls, struct GNUNET_SERVICE_Client *c, struct GNUNET_MQ_Handle *mq)
Callback called when a client connects to the service.
static void fail_intersection_operation(struct Operation *op)
Inform the client that the intersection operation has failed, and proceed to destroy the evaluate ope...
static void * channel_new_cb(void *cls, struct GNUNET_CADET_Channel *channel, const struct GNUNET_PeerIdentity *source)
Method called whenever another peer has added us to a channel the other peer initiated.
static void handle_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Handle a request for a set operation from another peer.
static int in_shutdown
Are we in shutdown? if GNUNET_YES and the number of clients drops to zero, disconnect from CADET.
static int destroy_elements_iterator(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator over hash map entries to free element entries.
static void shutdown_task(void *cls)
Called to clean up, after a shutdown has been requested.
static struct GNUNET_STATISTICS_Handle * _GSS_statistics
Statistics handle.
static unsigned int num_clients
Number of active clients.
static void finished_local_operations(void *cls)
Remember that we are done dealing with the local client AND have sent the other peer our message that...
static void channel_window_cb(void *cls, const struct GNUNET_CADET_Channel *channel, int window_size)
Function called whenever an MQ-channel's transmission window size changes.
static void advance_generation(struct Set *set)
Advance the current generation of a set, adding exclusion ranges if necessary.
static int check_incoming_msg(void *cls, const struct OperationRequestMessage *msg)
Check a request for a set operation from another peer.
static void handle_client_accept(void *cls, const struct GNUNET_SETI_AcceptMessage *msg)
Handle a request from the client to accept a set operation that came from a remote peer.
static void send_client_done_and_destroy(void *cls)
Signal to the client that the operation has finished and destroy the operation.
static void send_client_removed_element(struct Operation *op, struct GNUNET_SETI_Element *element)
If applicable in the current operation mode, send a result message to the client indicating we remove...
static void incoming_destroy(struct Operation *op)
Destroy an incoming request from a remote peer.
static struct Listener * listener_tail
Listeners are held in a doubly linked list.
static int initialize_map_unfiltered(void *cls, const struct GNUNET_HashCode *key, void *value)
Fills the "my_elements" hashmap with the initial set of (non-deleted) elements from the set of the sp...
static struct Operation * get_incoming(uint32_t id)
Get the incoming socket associated with the given id.
static void send_bloomfilter(struct Operation *op)
Send a bloomfilter to our peer.
static void process_bf(struct Operation *op)
Process a Bloomfilter once we got all the chunks.
static void send_element_count(struct Operation *op)
Send our element count to the peer, in case our element count is lower than theirs.
static void handle_intersection_p2p_element_info(void *cls, const struct IntersectionElementInfoMessage *msg)
Handle the initial struct IntersectionElementInfoMessage from a remote peer.
#define INCOMING_CHANNEL_TIMEOUT
How long do we hold on to an incoming channel if there is no local listener before giving up?
static void handle_client_cancel(void *cls, const struct GNUNET_SETI_CancelMessage *msg)
Handle a request from the client to cancel a running set operation.
static void client_disconnect_cb(void *cls, struct GNUNET_SERVICE_Client *client, void *internal_cls)
Clean up after a client has disconnected.
static void channel_end_cb(void *channel_ctx, const struct GNUNET_CADET_Channel *channel)
Function called whenever a channel is destroyed.
static void handle_client_create_set(void *cls, const struct GNUNET_SETI_CreateMessage *msg)
Called when a client wants to create a new set.
static void incoming_timeout_cb(void *cls)
Timeout happens iff:
static int iterator_bf_create(void *cls, const struct GNUNET_HashCode *key, void *value)
Create initial bloomfilter based on all the elements given.
static void handle_client_set_add(void *cls, const struct GNUNET_SETI_ElementMessage *msg)
Called when a client wants to add an element to a set it inhabits.
static void run(void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_SERVICE_Handle *service)
Function called by the service's run method to run service-specific setup code.
static void handle_intersection_p2p_done(void *cls, const struct IntersectionDoneMessage *idm)
Handle a done message from a remote peer.
GNUNET_SERVICE_MAIN("seti", GNUNET_SERVICE_OPTION_NONE, &run, &client_connect_cb, &client_disconnect_cb, NULL, GNUNET_MQ_hd_fixed_size(client_accept, GNUNET_MESSAGE_TYPE_SETI_ACCEPT, struct GNUNET_SETI_AcceptMessage, NULL), GNUNET_MQ_hd_var_size(client_set_add, GNUNET_MESSAGE_TYPE_SETI_ADD, struct GNUNET_SETI_ElementMessage, NULL), GNUNET_MQ_hd_fixed_size(client_create_set, GNUNET_MESSAGE_TYPE_SETI_CREATE, struct GNUNET_SETI_CreateMessage, NULL), GNUNET_MQ_hd_var_size(client_evaluate, GNUNET_MESSAGE_TYPE_SETI_EVALUATE, struct GNUNET_SETI_EvaluateMessage, NULL), GNUNET_MQ_hd_fixed_size(client_listen, GNUNET_MESSAGE_TYPE_SETI_LISTEN, struct GNUNET_SETI_ListenMessage, NULL), GNUNET_MQ_hd_fixed_size(client_reject, GNUNET_MESSAGE_TYPE_SETI_REJECT, struct GNUNET_SETI_RejectMessage, NULL), GNUNET_MQ_hd_fixed_size(client_cancel, GNUNET_MESSAGE_TYPE_SETI_CANCEL, struct GNUNET_SETI_CancelMessage, NULL), GNUNET_MQ_handler_end())
Define "main" method using service macro.
Peer-to-Peer messages for gnunet set.
Library for data block manipulation.
CADET service; establish channels to distant peers.
Two-peer set intersection operations.
API to create, modify and access statistics.
GNUNET_NETWORK_STRUCT_END void GNUNET_BLOCK_mingle_hash(const struct GNUNET_HashCode *in, uint32_t mingle_number, struct GNUNET_HashCode *hc)
Mingle hash with the mingle_number to produce different bits.
Definition: block.c:96
int GNUNET_CONTAINER_bloomfilter_get_raw_data(const struct GNUNET_CONTAINER_BloomFilter *bf, char *data, size_t size)
Copy the raw data of this Bloom filter into the given data array.
void GNUNET_CONTAINER_bloomfilter_add(struct GNUNET_CONTAINER_BloomFilter *bf, const struct GNUNET_HashCode *e)
Add an element to the filter.
bool GNUNET_CONTAINER_bloomfilter_test(const struct GNUNET_CONTAINER_BloomFilter *bf, const struct GNUNET_HashCode *e)
Test if an element is in the filter.
struct GNUNET_CONTAINER_BloomFilter * GNUNET_CONTAINER_bloomfilter_init(const char *data, size_t size, unsigned int k)
Create a Bloom filter from raw bits.
void GNUNET_CONTAINER_bloomfilter_free(struct GNUNET_CONTAINER_BloomFilter *bf)
Free the space associated with a filter in memory, flush to drive if needed (do not free the space on...
struct GNUNET_CADET_Channel * GNUNET_CADET_channel_create(struct GNUNET_CADET_Handle *h, void *channel_cls, const struct GNUNET_PeerIdentity *destination, const struct GNUNET_HashCode *port, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Create a new channel towards a remote peer.
Definition: cadet_api.c:1015
void GNUNET_CADET_receive_done(struct GNUNET_CADET_Channel *channel)
Send an ack on the channel to confirm the processing of a message.
Definition: cadet_api.c:872
void GNUNET_CADET_channel_destroy(struct GNUNET_CADET_Channel *channel)
Destroy an existing channel.
Definition: cadet_api.c:830
void GNUNET_CADET_disconnect(struct GNUNET_CADET_Handle *handle)
Disconnect from the cadet service.
Definition: cadet_api.c:774
void GNUNET_CADET_close_port(struct GNUNET_CADET_Port *p)
Close a port opened with GNUNET_CADET_open_port.
Definition: cadet_api.c:801
struct GNUNET_MQ_Handle * GNUNET_CADET_get_mq(const struct GNUNET_CADET_Channel *channel)
Obtain the message queue for a connected peer.
Definition: cadet_api.c:1066
struct GNUNET_CADET_Handle * GNUNET_CADET_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the MQ-based cadet service.
Definition: cadet_api.c:894
struct GNUNET_CADET_Port * GNUNET_CADET_open_port(struct GNUNET_CADET_Handle *h, const struct GNUNET_HashCode *port, GNUNET_CADET_ConnectEventHandler connects, void *connects_cls, GNUNET_CADET_WindowSizeEventHandler window_changes, GNUNET_CADET_DisconnectEventHandler disconnects, const struct GNUNET_MQ_MessageHandler *handlers)
Open a port to receive incoming MQ-based channels.
Definition: cadet_api.c:954
uint32_t GNUNET_CRYPTO_random_u32(enum GNUNET_CRYPTO_Quality mode, uint32_t i)
Produce a random value.
@ GNUNET_CRYPTO_QUALITY_NONCE
Randomness for IVs etc.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
void GNUNET_CRYPTO_hash_xor(const struct GNUNET_HashCode *a, const struct GNUNET_HashCode *b, struct GNUNET_HashCode *result)
compute result = a ^ b
Definition: crypto_hash.c:135
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MultiHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_iterator_next(struct GNUNET_CONTAINER_MultiHashMapIterator *iter, struct GNUNET_HashCode *key, const void **value)
Retrieve the next element from the hash map at the iterator's position.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_remove(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, const void *value)
Remove the given key-value pair from the map.
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
unsigned int GNUNET_CONTAINER_multihashmap_size(const struct GNUNET_CONTAINER_MultiHashMap *map)
Get the number of key-value pairs in the map.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
struct GNUNET_CONTAINER_MultiHashMapIterator * GNUNET_CONTAINER_multihashmap_iterator_create(const struct GNUNET_CONTAINER_MultiHashMap *map)
Create an iterator for a multihashmap.
void GNUNET_CONTAINER_multihashmap_iterator_destroy(struct GNUNET_CONTAINER_MultiHashMapIterator *iter)
Destroy a multihashmap iterator.
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY
There must only be one value per key; storing a value should fail if a value under the same key alrea...
#define GNUNET_log(kind,...)
#define GNUNET_memcmp(a, b)
Compare memory in a and b, where both must be of the same pointer type.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
#define GNUNET_MIN(a, b)
@ GNUNET_OK
@ GNUNET_YES
@ GNUNET_NO
@ GNUNET_SYSERR
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur.
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_DEBUG
@ GNUNET_ERROR_TYPE_INFO
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_malloc(size)
Wrapper around malloc.
struct GNUNET_MessageHeader * GNUNET_copy_message(const struct GNUNET_MessageHeader *msg)
Create a copy of the given message.
#define GNUNET_free(ptr)
Wrapper around free.
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:304
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct.
Definition: gnunet_mq_lib.h:62
#define GNUNET_MQ_msg_nested_mh(mvar, type, mh)
Allocate a GNUNET_MQ_Envelope, and append a payload message after the given message struct.
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:77
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
void GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Call a callback once the envelope has been sent, that is, sending it can not be canceled anymore.
Definition: mq.c:638
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
#define GNUNET_MQ_extract_nested_mh(var)
Return a pointer to the message at the end of the given message.
#define GNUNET_MESSAGE_TYPE_SETI_LISTEN
Listen for operation requests.
#define GNUNET_MESSAGE_TYPE_SETI_P2P_OPERATION_REQUEST
Request to begin set intersection operation.
#define GNUNET_MESSAGE_TYPE_SETI_REJECT
Reject a set request.
#define GNUNET_MESSAGE_TYPE_SETI_EVALUATE
Evaluate a set operation.
#define GNUNET_MESSAGE_TYPE_SETI_ADD
Add element to set.
#define GNUNET_MESSAGE_TYPE_SETI_P2P_BF
Bloom filter message for intersection exchange started by Bob.
#define GNUNET_MESSAGE_TYPE_SETI_RESULT
Handle result message from operation.
#define GNUNET_MESSAGE_TYPE_SETI_REQUEST
Notify the client of an incoming request from a remote peer.
#define GNUNET_MESSAGE_TYPE_SETI_CREATE
Create a new local set.
#define GNUNET_MESSAGE_TYPE_SETI_CANCEL
Cancel a set operation.
#define GNUNET_MESSAGE_TYPE_SETI_P2P_ELEMENT_INFO
Information about the element count for intersection.
#define GNUNET_MESSAGE_TYPE_SETI_ACCEPT
Accept an incoming set request.
#define GNUNET_MESSAGE_TYPE_SETI_P2P_DONE
Intersection operation is done.
void GNUNET_SCHEDULER_shutdown(void)
Request the shutdown of a scheduler.
Definition: scheduler.c:562
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run on shutdown, that is when a CTRL-C signal is received,...
Definition: scheduler.c:1334
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:975
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1272
void GNUNET_SERVICE_client_drop(struct GNUNET_SERVICE_Client *c)
Ask the server to disconnect from the given client.
Definition: service.c:2330
void GNUNET_SERVICE_client_continue(struct GNUNET_SERVICE_Client *c)
Continue receiving further messages from the given client.
Definition: service.c:2249
@ GNUNET_SERVICE_OPTION_NONE
Use defaults.
#define GNUNET_SETI_CONTEXT_MESSAGE_MAX_SIZE
Maximum size of a context message for set operation requests.
void GNUNET_SETI_element_hash(const struct GNUNET_SETI_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition: seti_api.c:849
@ GNUNET_SETI_STATUS_DONE
Success, all elements have been sent (and received).
@ GNUNET_SETI_STATUS_ADD_LOCAL
Element should be added to the result set of the local peer, i.e.
@ GNUNET_SETI_STATUS_FAILURE
The other peer refused to do the operation with us, or something went wrong.
@ GNUNET_SETI_STATUS_DEL_LOCAL
Element should be delete from the result set of the local peer, i.e.
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
#define _(String)
GNU gettext support macro.
Definition: platform.h:177
Bloom filter messages exchanged for set intersection calculation.
State we keep per client.
struct GNUNET_MQ_Handle * mq
MQ to talk to client.
struct Listener * listener
Listener, if associated with the client, otherwise NULL.
struct Set * set
Set, if associated with the client, otherwise NULL.
struct GNUNET_SERVICE_Client * client
Client this is about.
Information about an element element in the set.
int remote
GNUNET_YES if the element is a remote element, and does not belong to the operation's set.
struct GNUNET_SET_Element element
The actual element.
struct GNUNET_HashCode element_hash
Hash of the element.
unsigned int generation_added
Generation in which the element was added.
struct GNUNET_ARM_Operation * next
This is a doubly-linked list.
Definition: arm_api.c:45
Opaque handle to a channel.
Definition: cadet.h:116
Opaque handle to the service.
Definition: cadet_api.c:39
Opaque handle to a port.
Definition: cadet_api.c:80
Internal representation of the hash map.
A 512-bit hashcode.
Handle to a message queue.
Definition: mq.c:87
Message handler for a specific message type.
Header for all communications.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
The identity of the host (wraps the signing key of the peer).
Entry in list of pending tasks.
Definition: scheduler.c:136
Handle to a client that is connected to a service.
Definition: service.c:252
Handle to a service.
Definition: service.c:118
Message sent by a listening client to the service to accept performing the operation with the other p...
Definition: seti.h:77
Sent to the service by the client in order to cancel a set operation.
Definition: seti.h:252
Message sent by the client to the service to ask starting a new set to perform operations with.
Definition: seti.h:40
Message sent by client to the service to add an element to the set.
Definition: seti.h:227
Element stored in a set.
const void * data
Actual data of the element.
uint16_t element_type
Application-specific element type.
uint16_t size
Number of bytes in the buffer pointed to by data.
Message sent by client to service to initiate a set operation as a client (not as listener).
Definition: seti.h:152
Message sent by the client to the service to start listening for incoming requests to perform a certa...
Definition: seti.h:54
Message sent by a listening client to the service to reject performing the operation with the other p...
Definition: seti.h:107
A request for an operation with another client.
Definition: seti.h:124
struct GNUNET_PeerIdentity peer_id
Identity of the requesting peer.
Definition: seti.h:139
uint32_t accept_id
ID of the to identify the request when accepting or rejecting it.
Definition: seti.h:134
Message sent by the service to the client to indicate an element that is removed (set intersection) o...
Definition: seti.h:192
uint32_t request_id
id the result belongs to
Definition: seti.h:206
uint16_t element_type
Type of the element attached to the message, if any.
Definition: seti.h:217
uint16_t result_status
Was the evaluation successful? Contains an enum GNUNET_SETI_Status in NBO.
Definition: seti.h:212
uint16_t size
Number of bytes in the buffer pointed to by data.
const void * data
Actual data of the element.
uint16_t element_type
Application-specific element type.
Handle for the service.
Last message, send to confirm the final set.
uint32_t final_element_count
Final number of elements in intersection.
struct GNUNET_HashCode element_xor_hash
XOR of all hashes over all elements remaining in the set.
During intersection, the first (and possibly second) message send it the number of elements in the se...
A listener is inhabited by a client, and waits for evaluation requests from remote peers.
struct Listener * next
Listeners are held in a doubly linked list.
struct ClientState * cs
Client that owns the listener.
struct GNUNET_HashCode app_id
Application ID for the operation, used to distinguish multiple operations of the same type with the s...
struct GNUNET_CADET_Port * open_port
The port we are listening on with CADET.
struct Listener * prev
Listeners are held in a doubly linked list.
struct Operation * op_tail
Tail of DLL of operations this listener is responsible for.
struct Operation * op_head
Head of DLL of operations this listener is responsible for.
Operation context used to execute a set operation.
unsigned int generation_created
Generation in which the operation handle was created.
uint32_t suggest_id
Unique request id for the request from a remote peer, sent to the client, which will accept or reject...
int channel_death_expected
Set whenever we reach the state where the death of the channel is perfectly find and should NOT resul...
struct GNUNET_MessageHeader * context_msg
Context message, may be NULL.
struct GNUNET_CONTAINER_BloomFilter * local_bf
BF of the set's element.
uint32_t bf_data_offset
How many bytes of bf_data are valid?
struct Operation * prev
Kept in a DLL of the listener, if listener is non-NULL.
struct GNUNET_CADET_Channel * channel
Channel to the peer.
uint32_t bf_bits_per_element
size of the bloomfilter
struct GNUNET_CONTAINER_BloomFilter * remote_bf
The bf we currently receive.
struct GNUNET_HashCode my_xor
XOR of the keys of all of the elements (remaining) in my set.
struct GNUNET_HashCode other_xor
XOR of the keys of all of the elements (remaining) in the other peer's set.
char * bf_data
For multipart BF transmissions, we have to store the bloomfilter-data until we fully received it.
struct GNUNET_MQ_Handle * mq
Message queue for the channel.
struct GNUNET_PeerIdentity peer
The identity of the requesting peer.
int client_done_sent
Did we send the client that we are done?
enum IntersectionOperationPhase phase
Current state of the operation.
struct GNUNET_CONTAINER_MultiHashMapIterator * full_result_iter
Iterator for sending the final set of my_elements to the client.
int return_intersection
When are elements sent to the client, and which elements are sent?
struct Listener * listener
Port this operation runs on.
uint32_t bf_data_size
size of the bloomfilter in bf_data.
struct Operation * next
Kept in a DLL of the listener, if listener is non-NULL.
uint32_t salt
Salt to use for the operation.
uint32_t my_element_count
Current element count contained within my_elements.
uint32_t remote_element_count
Remote peers element count.
uint32_t client_request_id
ID used to identify an operation between service and client.
struct GNUNET_CONTAINER_MultiHashMap * my_elements
Remaining elements in the intersection operation.
struct GNUNET_SCHEDULER_Task * timeout_task
Timeout task, if the incoming peer has not been accepted after the timeout, it will be disconnected.
struct Set * set
Set associated with the operation, NULL until the spec has been associated with a set.
SetContent stores the actual set elements, which may be shared by multiple generations derived from o...
int iterator_count
Number of concurrently active iterators.
unsigned int latest_generation
FIXME: document!
struct GNUNET_CONTAINER_MultiHashMap * elements
Maps struct GNUNET_HashCode * to struct ElementEntry *.
unsigned int refcount
Number of references to the content.
A set that supports a specific operation with other peers.
struct Set * next
Sets are held in a doubly linked list (in sets_head and sets_tail).
struct Operation * ops_head
Evaluate operations are held in a linked list.
struct Operation * ops_tail
Evaluate operations are held in a linked list.
struct Set * prev
Sets are held in a doubly linked list.
struct SetContent * content
Content, possibly shared by multiple sets, and thus reference counted.
uint32_t current_set_element_count
Number of currently valid elements in the set which have not been removed.
struct ClientState * cs
Client that owns the set.
unsigned int current_generation
Current generation, that is, number of previously executed operations and lazy copies on the underlyi...