GNUnet  0.11.x
gnunet-service-consensus.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2012, 2013, 2017 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_set_service.h"
35 #include "consensus_protocol.h"
36 #include "consensus.h"
37 
38 
40 {
45  VOTE_STAY = 0,
49  VOTE_ADD = 1,
54 };
55 
56 
58 {
62 };
63 
64 
66 
71 struct TaskKey
72 {
76  uint16_t kind GNUNET_PACKED;
77 
83 
88 
93 
100 };
101 
102 
103 struct SetKey
104 {
105  int set_kind GNUNET_PACKED;
108 };
109 
110 
111 struct SetEntry
112 {
113  struct SetKey key;
121 };
122 
123 
124 struct DiffKey
125 {
126  int diff_kind GNUNET_PACKED;
129 };
130 
131 struct RfnKey
132 {
133  int rfn_kind GNUNET_PACKED;
136 };
137 
138 
140 
142 {
156 };
157 
158 
160 {
169 };
170 
172 {
177 };
178 
180 {
185 };
186 
187 
188 struct SetOpCls
189 {
190  struct SetKey input_set;
191 
192  struct SetKey output_set;
193  struct RfnKey output_rfn;
194  struct DiffKey output_diff;
195 
197 
199 
201 };
202 
203 
204 struct FinishCls
205 {
206  struct SetKey input_set;
207 };
208 
214 {
215  struct SetOpCls setop;
216  struct FinishCls finish;
217 };
218 
219 struct TaskEntry;
220 
221 typedef void (*TaskFunc) (struct TaskEntry *task);
222 
223 /*
224  * Node in the consensus task graph.
225  */
226 struct TaskEntry
227 {
228  struct TaskKey key;
229 
230  struct Step *step;
231 
233 
235 
238 
239  union TaskFuncCls cls;
240 };
241 
242 
243 struct Step
244 {
249  struct Step *prev;
250 
255  struct Step *next;
256 
258 
262  struct TaskEntry **tasks;
263  unsigned int tasks_len;
264  unsigned int tasks_cap;
265 
266  unsigned int finished_tasks;
267 
268  /*
269  * Tasks that have this task as dependency.
270  *
271  * We store pointers to subordinates rather
272  * than to prerequisites since it makes
273  * tracking the readiness of a task easier.
274  */
275  struct Step **subordinates;
276  unsigned int subordinates_len;
277  unsigned int subordinates_cap;
278 
284 
285  /*
286  * Task that will run this step despite
287  * any pending prerequisites.
288  */
290 
291  unsigned int is_running;
292 
293  unsigned int is_finished;
294 
295  /*
296  * Synchrony round of the task.
297  * Determines the deadline for the task.
298  */
299  unsigned int round;
300 
305  char *debug_name;
306 
319 };
320 
321 
323 {
325 
326  /*
327  * GNUNET_YES if the peer votes for the proposal.
328  */
329  int *votes;
330 
335  enum ReferendumVote proposal;
336 };
337 
338 
340 {
341  struct RfnKey key;
342 
343  /*
344  * Elements where there is at least one proposed change.
345  *
346  * Maps the hash of the GNUNET_SET_Element
347  * to 'struct RfnElementInfo'.
348  */
350 
351  unsigned int num_peers;
352 
364 
365 
372 };
373 
374 
376 {
378 
383  int weight;
384 };
385 
386 
390 struct DiffEntry
391 {
392  struct DiffKey key;
394 };
395 
396 struct SetHandle
397 {
398  struct SetHandle *prev;
399  struct SetHandle *next;
400 
402 };
403 
404 
409 {
414 
419 
421 
425 
430 
431  /*
432  * Mapping from (hashed) TaskKey to TaskEntry.
433  *
434  * We map the application_id for a round to the task that should be
435  * executed, so we don't have to go through all task whenever we get
436  * an incoming set op request.
437  */
439 
440  struct Step *steps_head;
441  struct Step *steps_tail;
442 
444 
446 
451  struct GNUNET_HashCode global_id;
452 
457 
462 
466  struct GNUNET_TIME_Absolute conclude_start;
467 
473  struct GNUNET_TIME_Absolute conclude_deadline;
474 
476 
480  unsigned int num_peers;
481 
485  unsigned int local_peer_idx;
486 
492 
497 
501  uint64_t first_size;
502 
504 
508  uint64_t lower_bound;
509 
512 };
513 
518 
523 
527 static const struct GNUNET_CONFIGURATION_Handle *cfg;
528 
533 
538 
539 
540 static void
541 finish_task (struct TaskEntry *task);
542 
543 
544 static void
545 run_ready_steps (struct ConsensusSession *session);
546 
547 
548 static const char *
549 phasename (uint16_t phase)
550 {
551  switch (phase)
552  {
553  case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
554 
555  case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
556 
557  case PHASE_KIND_FINISH: return "FINISH";
558 
559  case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
560 
561  case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
562 
563  case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
564 
565  case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
566 
567  case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
568 
569  case PHASE_KIND_APPLY_REP: return "APPLY_REP";
570 
571  default: return "(unknown)";
572  }
573 }
574 
575 
576 static const char *
577 setname (uint16_t kind)
578 {
579  switch (kind)
580  {
581  case SET_KIND_CURRENT: return "CURRENT";
582 
583  case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
584 
585  case SET_KIND_NONE: return "NONE";
586 
587  default: return "(unknown)";
588  }
589 }
590 
591 
592 static const char *
593 rfnname (uint16_t kind)
594 {
595  switch (kind)
596  {
597  case RFN_KIND_NONE: return "NONE";
598 
599  case RFN_KIND_ECHO: return "ECHO";
600 
601  case RFN_KIND_CONFIRM: return "CONFIRM";
602 
603  default: return "(unknown)";
604  }
605 }
606 
607 
608 static const char *
609 diffname (uint16_t kind)
610 {
611  switch (kind)
612  {
613  case DIFF_KIND_NONE: return "NONE";
614 
615  case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
616 
617  case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
618 
619  case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
620 
621  default: return "(unknown)";
622  }
623 }
624 
625 
626 #ifdef GNUNET_EXTRA_LOGGING
627 
628 
629 static const char *
630 debug_str_element (const struct GNUNET_SET_Element *el)
631 {
632  struct GNUNET_HashCode hash;
633 
634  GNUNET_SET_element_hash (el, &hash);
635 
636  return GNUNET_h2s (&hash);
637 }
638 
639 
640 static const char *
641 debug_str_task_key (struct TaskKey *tk)
642 {
643  static char buf[256];
644 
645  snprintf (buf, sizeof(buf),
646  "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
647  phasename (tk->kind), tk->peer1, tk->peer2,
648  tk->leader, tk->repetition);
649 
650  return buf;
651 }
652 
653 
654 static const char *
655 debug_str_diff_key (struct DiffKey *dk)
656 {
657  static char buf[256];
658 
659  snprintf (buf, sizeof(buf),
660  "DiffKey kind=%s, k1=%d, k2=%d",
661  diffname (dk->diff_kind), dk->k1, dk->k2);
662 
663  return buf;
664 }
665 
666 
667 static const char *
668 debug_str_set_key (const struct SetKey *sk)
669 {
670  static char buf[256];
671 
672  snprintf (buf, sizeof(buf),
673  "SetKey kind=%s, k1=%d, k2=%d",
674  setname (sk->set_kind), sk->k1, sk->k2);
675 
676  return buf;
677 }
678 
679 
680 static const char *
681 debug_str_rfn_key (const struct RfnKey *rk)
682 {
683  static char buf[256];
684 
685  snprintf (buf, sizeof(buf),
686  "RfnKey kind=%s, k1=%d, k2=%d",
687  rfnname (rk->rfn_kind), rk->k1, rk->k2);
688 
689  return buf;
690 }
691 
692 
693 #endif /* GNUNET_EXTRA_LOGGING */
694 
695 
705 static int
707  const struct GNUNET_SET_Element *element)
708 {
709  struct TaskEntry *task = (struct TaskEntry *) cls;
710  struct ConsensusSession *session = task->step->session;
711  struct GNUNET_MQ_Envelope *ev;
712 
713  if (NULL != element)
714  {
716  const struct ConsensusElement *ce;
717 
719  element->element_type);
720  ce = element->data;
721 
722  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "marker is %u\n",
723  (unsigned) ce->marker);
724 
725  if (0 != ce->marker)
726  return GNUNET_YES;
727 
729  "P%d: sending element %s to client\n",
730  session->local_peer_idx,
731  debug_str_element (element));
732 
733  ev = GNUNET_MQ_msg_extra (m, element->size - sizeof(struct
736  m->element_type = ce->payload_type;
737  GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof(struct
739  GNUNET_MQ_send (session->client_mq, ev);
740  }
741  else
742  {
744  "P%d: finished iterating elements for client\n",
745  session->local_peer_idx);
746  ev = GNUNET_MQ_msg_header (
748  GNUNET_MQ_send (session->client_mq, ev);
749  }
750  return GNUNET_YES;
751 }
752 
753 
754 static struct SetEntry *
755 lookup_set (struct ConsensusSession *session, struct SetKey *key)
756 {
757  struct GNUNET_HashCode hash;
758 
760  "P%u: looking up set {%s}\n",
761  session->local_peer_idx,
762  debug_str_set_key (key));
763 
765  GNUNET_CRYPTO_hash (key, sizeof(struct SetKey), &hash);
766  return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
767 }
768 
769 
770 static struct DiffEntry *
771 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
772 {
773  struct GNUNET_HashCode hash;
774 
776  "P%u: looking up diff {%s}\n",
777  session->local_peer_idx,
778  debug_str_diff_key (key));
779 
781  GNUNET_CRYPTO_hash (key, sizeof(struct DiffKey), &hash);
782  return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
783 }
784 
785 
786 static struct ReferendumEntry *
787 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
788 {
789  struct GNUNET_HashCode hash;
790 
792  "P%u: looking up rfn {%s}\n",
793  session->local_peer_idx,
794  debug_str_rfn_key (key));
795 
797  GNUNET_CRYPTO_hash (key, sizeof(struct RfnKey), &hash);
798  return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
799 }
800 
801 
802 static void
803 diff_insert (struct DiffEntry *diff,
804  int weight,
805  const struct GNUNET_SET_Element *element)
806 {
807  struct DiffElementInfo *di;
808  struct GNUNET_HashCode hash;
809 
810  GNUNET_assert ((1 == weight) || (-1 == weight));
811 
813  "diff_insert with element size %u\n",
814  element->size);
815 
817  "hashing element\n");
818 
819  GNUNET_SET_element_hash (element, &hash);
820 
822  "hashed element\n");
823 
824  di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
825 
826  if (NULL == di)
827  {
828  di = GNUNET_new (struct DiffElementInfo);
829  di->element = GNUNET_SET_element_dup (element);
832  &hash, di,
834  }
835 
836  di->weight = weight;
837 }
838 
839 
840 static void
842  uint16_t commit_peer)
843 {
844  GNUNET_assert (commit_peer < rfn->num_peers);
845 
846  rfn->peer_commited[commit_peer] = GNUNET_YES;
847 }
848 
849 
850 static void
852  uint16_t contested_peer)
853 {
854  GNUNET_assert (contested_peer < rfn->num_peers);
855 
856  rfn->peer_contested[contested_peer] = GNUNET_YES;
857 }
858 
859 
860 static uint16_t
862 {
863  uint16_t i;
864  uint16_t ret;
865 
866  ret = 0;
867  for (i = 0; i < rfn->num_peers; i++)
868  if ((GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO ==
869  rfn->peer_contested[i]))
870  ret++;
871 
872  return ret;
873 }
874 
875 
876 static void
878  uint16_t voting_peer,
879  enum ReferendumVote vote,
880  const struct GNUNET_SET_Element *element)
881 {
882  struct RfnElementInfo *ri;
883  struct GNUNET_HashCode hash;
884 
885  GNUNET_assert (voting_peer < rfn->num_peers);
886 
887  /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
888  since VOTE_KEEP is implicit in not voting. */
889  GNUNET_assert ((VOTE_ADD == vote) || (VOTE_REMOVE == vote));
890 
891  GNUNET_SET_element_hash (element, &hash);
893 
894  if (NULL == ri)
895  {
896  ri = GNUNET_new (struct RfnElementInfo);
897  ri->element = GNUNET_SET_element_dup (element);
898  ri->votes = GNUNET_new_array (rfn->num_peers, int);
901  &hash, ri,
903  }
904 
905  ri->votes[voting_peer] = GNUNET_YES;
906  ri->proposal = vote;
907 }
908 
909 
910 static uint16_t
912 {
913  uint16_t me = task->step->session->local_peer_idx;
914 
915  if (task->key.peer1 == me)
916  return task->key.peer2;
917  return task->key.peer1;
918 }
919 
920 
921 static int
922 cmp_uint64_t (const void *pa, const void *pb)
923 {
924  uint64_t a = *(uint64_t *) pa;
925  uint64_t b = *(uint64_t *) pb;
926 
927  if (a == b)
928  return 0;
929  if (a < b)
930  return -1;
931  return 1;
932 }
933 
934 
944 static void
945 set_result_cb (void *cls,
946  const struct GNUNET_SET_Element *element,
947  uint64_t current_size,
949 {
950  struct TaskEntry *task = cls;
951  struct ConsensusSession *session = task->step->session;
952  struct SetEntry *output_set = NULL;
953  struct DiffEntry *output_diff = NULL;
954  struct ReferendumEntry *output_rfn = NULL;
955  unsigned int other_idx;
956  struct SetOpCls *setop;
957  const struct ConsensusElement *consensus_element = NULL;
958 
959  if (NULL != element)
960  {
962  "P%u: got element of type %u, status %u\n",
963  session->local_peer_idx,
964  (unsigned) element->element_type,
965  (unsigned) status);
967  element->element_type);
968  consensus_element = element->data;
969  }
970 
971  setop = &task->cls.setop;
972 
973 
975  "P%u: got set result for {%s}, status %u\n",
976  session->local_peer_idx,
977  debug_str_task_key (&task->key),
978  status);
979 
980  if (GNUNET_NO == task->is_started)
981  {
982  GNUNET_break_op (0);
983  return;
984  }
985 
986  if (GNUNET_YES == task->is_finished)
987  {
988  GNUNET_break_op (0);
989  return;
990  }
991 
992  other_idx = task_other_peer (task);
993 
994  if (SET_KIND_NONE != setop->output_set.set_kind)
995  {
996  output_set = lookup_set (session, &setop->output_set);
997  GNUNET_assert (NULL != output_set);
998  }
999 
1000  if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
1001  {
1002  output_diff = lookup_diff (session, &setop->output_diff);
1003  GNUNET_assert (NULL != output_diff);
1004  }
1005 
1006  if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
1007  {
1008  output_rfn = lookup_rfn (session, &setop->output_rfn);
1009  GNUNET_assert (NULL != output_rfn);
1010  }
1011 
1012  if (GNUNET_YES == session->peers_blacklisted[other_idx])
1013  {
1014  /* Peer might have been blacklisted
1015  by a gradecast running in parallel, ignore elements from now */
1016  if (GNUNET_SET_STATUS_ADD_LOCAL == status)
1017  return;
1018  if (GNUNET_SET_STATUS_ADD_REMOTE == status)
1019  return;
1020  }
1021 
1022  if ((NULL != consensus_element) && (0 != consensus_element->marker))
1023  {
1025  "P%u: got some marker\n",
1026  session->local_peer_idx);
1027  if ((GNUNET_YES == setop->transceive_contested) &&
1028  (CONSENSUS_MARKER_CONTESTED == consensus_element->marker))
1029  {
1030  GNUNET_assert (NULL != output_rfn);
1031  rfn_contest (output_rfn, task_other_peer (task));
1032  return;
1033  }
1034 
1035  if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
1036  {
1038  "P%u: got size marker\n",
1039  session->local_peer_idx);
1040 
1041 
1042  struct ConsensusSizeElement *cse = (void *) consensus_element;
1043 
1044  if (cse->sender_index == other_idx)
1045  {
1046  if (NULL == session->first_sizes_received)
1047  session->first_sizes_received = GNUNET_new_array (session->num_peers,
1048  uint64_t);
1049  session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1050 
1051  uint64_t *copy = GNUNET_memdup (session->first_sizes_received,
1052  sizeof(uint64_t) * session->num_peers);
1053  qsort (copy, session->num_peers, sizeof(uint64_t), cmp_uint64_t);
1054  session->lower_bound = copy[session->num_peers / 3 + 1];
1056  "P%u: lower bound %llu\n",
1057  session->local_peer_idx,
1058  (long long) session->lower_bound);
1059  GNUNET_free (copy);
1060  }
1061  return;
1062  }
1063 
1064  return;
1065  }
1066 
1067  switch (status)
1068  {
1070  GNUNET_assert (NULL != consensus_element);
1072  "Adding element in Task {%s}\n",
1073  debug_str_task_key (&task->key));
1074  if (NULL != output_set)
1075  {
1076  // FIXME: record pending adds, use callback
1077  GNUNET_SET_add_element (output_set->h,
1078  element,
1079  NULL,
1080  NULL);
1081 #ifdef GNUNET_EXTRA_LOGGING
1083  "P%u: adding element %s into set {%s} of task {%s}\n",
1084  session->local_peer_idx,
1085  debug_str_element (element),
1086  debug_str_set_key (&setop->output_set),
1087  debug_str_task_key (&task->key));
1088 #endif
1089  }
1090  if (NULL != output_diff)
1091  {
1092  diff_insert (output_diff, 1, element);
1093 #ifdef GNUNET_EXTRA_LOGGING
1095  "P%u: adding element %s into diff {%s} of task {%s}\n",
1096  session->local_peer_idx,
1097  debug_str_element (element),
1098  debug_str_diff_key (&setop->output_diff),
1099  debug_str_task_key (&task->key));
1100 #endif
1101  }
1102  if (NULL != output_rfn)
1103  {
1104  rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1105 #ifdef GNUNET_EXTRA_LOGGING
1107  "P%u: adding element %s into rfn {%s} of task {%s}\n",
1108  session->local_peer_idx,
1109  debug_str_element (element),
1110  debug_str_rfn_key (&setop->output_rfn),
1111  debug_str_task_key (&task->key));
1112 #endif
1113  }
1114  // XXX: add result to structures in task
1115  break;
1116 
1118  GNUNET_assert (NULL != consensus_element);
1119  if (GNUNET_YES == setop->do_not_remove)
1120  break;
1121  if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1122  break;
1124  "Removing element in Task {%s}\n",
1125  debug_str_task_key (&task->key));
1126  if (NULL != output_set)
1127  {
1128  // FIXME: record pending adds, use callback
1129  GNUNET_SET_remove_element (output_set->h,
1130  element,
1131  NULL,
1132  NULL);
1133 #ifdef GNUNET_EXTRA_LOGGING
1135  "P%u: removing element %s from set {%s} of task {%s}\n",
1136  session->local_peer_idx,
1137  debug_str_element (element),
1138  debug_str_set_key (&setop->output_set),
1139  debug_str_task_key (&task->key));
1140 #endif
1141  }
1142  if (NULL != output_diff)
1143  {
1144  diff_insert (output_diff, -1, element);
1145 #ifdef GNUNET_EXTRA_LOGGING
1147  "P%u: removing element %s from diff {%s} of task {%s}\n",
1148  session->local_peer_idx,
1149  debug_str_element (element),
1150  debug_str_diff_key (&setop->output_diff),
1151  debug_str_task_key (&task->key));
1152 #endif
1153  }
1154  if (NULL != output_rfn)
1155  {
1156  rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1157 #ifdef GNUNET_EXTRA_LOGGING
1159  "P%u: removing element %s from rfn {%s} of task {%s}\n",
1160  session->local_peer_idx,
1161  debug_str_element (element),
1162  debug_str_rfn_key (&setop->output_rfn),
1163  debug_str_task_key (&task->key));
1164 #endif
1165  }
1166  break;
1167 
1169  // XXX: check first if any changes to the underlying
1170  // set are still pending
1172  "P%u: Finishing setop in Task {%s} (%u/%u)\n",
1173  session->local_peer_idx,
1174  debug_str_task_key (&task->key),
1175  (unsigned int) task->step->finished_tasks,
1176  (unsigned int) task->step->tasks_len);
1177  if (NULL != output_rfn)
1178  {
1179  rfn_commit (output_rfn, task_other_peer (task));
1180  }
1181  if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1182  {
1183  session->first_size = current_size;
1184  }
1185  finish_task (task);
1186  break;
1187 
1189  // XXX: cleanup
1190  GNUNET_break_op (0);
1191  finish_task (task);
1192  return;
1193 
1194  default:
1195  /* not reached */
1196  GNUNET_assert (0);
1197  }
1198 }
1199 
1200 
1201 #ifdef EVIL
1202 
1203 enum EvilnessType
1204 {
1205  EVILNESS_NONE,
1206  EVILNESS_CRAM_ALL,
1207  EVILNESS_CRAM_LEAD,
1208  EVILNESS_CRAM_ECHO,
1209  EVILNESS_SLACK,
1210  EVILNESS_SLACK_A2A,
1211 };
1212 
1213 enum EvilnessSubType
1214 {
1215  EVILNESS_SUB_NONE,
1216  EVILNESS_SUB_REPLACEMENT,
1217  EVILNESS_SUB_NO_REPLACEMENT,
1218 };
1219 
1220 struct Evilness
1221 {
1222  enum EvilnessType type;
1223  enum EvilnessSubType subtype;
1224  unsigned int num;
1225 };
1226 
1227 
1228 static int
1229 parse_evilness_cram_subtype (const char *evil_subtype_str, struct
1230  Evilness *evil)
1231 {
1232  if (0 == strcmp ("replace", evil_subtype_str))
1233  {
1234  evil->subtype = EVILNESS_SUB_REPLACEMENT;
1235  }
1236  else if (0 == strcmp ("noreplace", evil_subtype_str))
1237  {
1238  evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1239  }
1240  else
1241  {
1243  "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1244  evil_subtype_str);
1245  return GNUNET_SYSERR;
1246  }
1247  return GNUNET_OK;
1248 }
1249 
1250 
1251 static void
1252 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1253 {
1254  char *evil_spec;
1255  char *field;
1256  char *evil_type_str = NULL;
1257  char *evil_subtype_str = NULL;
1258 
1259  GNUNET_assert (NULL != evil);
1260 
1261  if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus",
1262  "EVIL_SPEC",
1263  &evil_spec))
1264  {
1266  "P%u: no evilness\n",
1267  session->local_peer_idx);
1268  evil->type = EVILNESS_NONE;
1269  return;
1270  }
1272  "P%u: got evilness spec\n",
1273  session->local_peer_idx);
1274 
1275  for (field = strtok (evil_spec, "/");
1276  NULL != field;
1277  field = strtok (NULL, "/"))
1278  {
1279  unsigned int peer_num;
1280  unsigned int evil_num;
1281  int ret;
1282 
1283  evil_type_str = NULL;
1284  evil_subtype_str = NULL;
1285 
1286  ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str,
1287  &evil_subtype_str, &evil_num);
1288 
1289  if (ret != 4)
1290  {
1292  "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1293  field,
1294  ret);
1295  goto not_evil;
1296  }
1297 
1298  GNUNET_assert (NULL != evil_type_str);
1299  GNUNET_assert (NULL != evil_subtype_str);
1300 
1301  if (peer_num == session->local_peer_idx)
1302  {
1303  if (0 == strcmp ("slack", evil_type_str))
1304  {
1305  evil->type = EVILNESS_SLACK;
1306  }
1307  if (0 == strcmp ("slack-a2a", evil_type_str))
1308  {
1309  evil->type = EVILNESS_SLACK_A2A;
1310  }
1311  else if (0 == strcmp ("cram-all", evil_type_str))
1312  {
1313  evil->type = EVILNESS_CRAM_ALL;
1314  evil->num = evil_num;
1315  if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1316  goto not_evil;
1317  }
1318  else if (0 == strcmp ("cram-lead", evil_type_str))
1319  {
1320  evil->type = EVILNESS_CRAM_LEAD;
1321  evil->num = evil_num;
1322  if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1323  goto not_evil;
1324  }
1325  else if (0 == strcmp ("cram-echo", evil_type_str))
1326  {
1327  evil->type = EVILNESS_CRAM_ECHO;
1328  evil->num = evil_num;
1329  if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1330  goto not_evil;
1331  }
1332  else
1333  {
1335  "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1336  evil_type_str);
1337  goto not_evil;
1338  }
1339  goto cleanup;
1340  }
1341  /* No GNUNET_free since memory was allocated by libc */
1342  free (evil_type_str);
1343  evil_type_str = NULL;
1344  evil_subtype_str = NULL;
1345  }
1346 not_evil:
1347  evil->type = EVILNESS_NONE;
1348 cleanup:
1349  GNUNET_free (evil_spec);
1350  /* no GNUNET_free_non_null since it wasn't
1351  * allocated with GNUNET_malloc */
1352  if (NULL != evil_type_str)
1353  free (evil_type_str);
1354  if (NULL != evil_subtype_str)
1355  free (evil_subtype_str);
1356 }
1357 
1358 
1359 #endif
1360 
1361 
1366 static void
1367 commit_set (struct ConsensusSession *session,
1368  struct TaskEntry *task)
1369 {
1370  struct SetEntry *set;
1371  struct SetOpCls *setop = &task->cls.setop;
1372 
1373  GNUNET_assert (NULL != setop->op);
1374  set = lookup_set (session, &setop->input_set);
1375  GNUNET_assert (NULL != set);
1376 
1377  if ((GNUNET_YES == setop->transceive_contested) && (GNUNET_YES ==
1378  set->is_contested))
1379  {
1380  struct GNUNET_SET_Element element;
1381  struct ConsensusElement ce = { 0 };
1383  element.data = &ce;
1384  element.size = sizeof(struct ConsensusElement);
1386  GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1387  }
1388 
1389  if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1390  {
1391  struct GNUNET_SET_Element element;
1392  struct ConsensusSizeElement cse = {
1393  .size = 0,
1394  .sender_index = 0
1395  };
1396  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting size marker\n");
1398  cse.size = GNUNET_htonll (session->first_size);
1399  cse.sender_index = session->local_peer_idx;
1400  element.data = &cse;
1401  element.size = sizeof(struct ConsensusSizeElement);
1403  GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1404  }
1405 
1406 #ifdef EVIL
1407  {
1408  unsigned int i;
1409  struct Evilness evil;
1410 
1411  get_evilness (session, &evil);
1412  if (EVILNESS_NONE != evil.type)
1413  {
1414  /* Useful for evaluation */
1415  GNUNET_STATISTICS_set (statistics,
1416  "is evil",
1417  1,
1418  GNUNET_NO);
1419  }
1420  switch (evil.type)
1421  {
1422  case EVILNESS_CRAM_ALL:
1423  case EVILNESS_CRAM_LEAD:
1424  case EVILNESS_CRAM_ECHO:
1425  /* We're not cramming elements in the
1426  all-to-all round, since that would just
1427  add more elements to the result set, but
1428  wouldn't test robustness. */
1429  if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1430  {
1431  GNUNET_SET_commit (setop->op, set->h);
1432  break;
1433  }
1434  if ((EVILNESS_CRAM_LEAD == evil.type) &&
1435  ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) ||
1436  (SET_KIND_CURRENT != set->key.set_kind) ))
1437  {
1438  GNUNET_SET_commit (setop->op, set->h);
1439  break;
1440  }
1441  if ((EVILNESS_CRAM_ECHO == evil.type) && (PHASE_KIND_GRADECAST_ECHO !=
1442  task->key.kind))
1443  {
1444  GNUNET_SET_commit (setop->op, set->h);
1445  break;
1446  }
1447  for (i = 0; i < evil.num; i++)
1448  {
1449  struct GNUNET_SET_Element element;
1450  struct ConsensusStuffedElement se = {
1451  .ce.payload_type = 0,
1452  .ce.marker = 0,
1453  };
1454  element.data = &se;
1455  element.size = sizeof(struct ConsensusStuffedElement);
1457 
1458  if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1459  {
1460  /* Always generate a new element. */
1462  &se.rand);
1463  }
1464  else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1465  {
1466  /* Always cram the same elements, derived from counter. */
1467  GNUNET_CRYPTO_hash (&i, sizeof(i), &se.rand);
1468  }
1469  else
1470  {
1471  GNUNET_assert (0);
1472  }
1473  GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1474 #ifdef GNUNET_EXTRA_LOGGING
1476  "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1477  session->local_peer_idx,
1478  debug_str_element (&element),
1479  debug_str_set_key (&setop->input_set),
1480  debug_str_task_key (&task->key));
1481 #endif
1482  }
1483  GNUNET_STATISTICS_update (statistics,
1484  "# stuffed elements",
1485  evil.num,
1486  GNUNET_NO);
1487  GNUNET_SET_commit (setop->op, set->h);
1488  break;
1489 
1490  case EVILNESS_SLACK:
1492  "P%u: evil peer: slacking\n",
1493  (unsigned int) session->local_peer_idx);
1494 
1495  /* Do nothing. */
1496  case EVILNESS_SLACK_A2A:
1497  if ((PHASE_KIND_ALL_TO_ALL_2 == task->key.kind) ||
1498  (PHASE_KIND_ALL_TO_ALL == task->key.kind))
1499  {
1500  struct GNUNET_SET_Handle *empty_set;
1501  empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1502  GNUNET_SET_commit (setop->op, empty_set);
1503  GNUNET_SET_destroy (empty_set);
1504  }
1505  else
1506  {
1507  GNUNET_SET_commit (setop->op, set->h);
1508  }
1509  break;
1510 
1511  case EVILNESS_NONE:
1512  GNUNET_SET_commit (setop->op, set->h);
1513  break;
1514  }
1515  }
1516 #else
1517  if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1518  {
1519  GNUNET_SET_commit (setop->op, set->h);
1520  }
1521  else
1522  {
1523  /* For our testcases, we don't want the blacklisted
1524  peers to wait. */
1526  setop->op = NULL;
1527  finish_task (task);
1528  }
1529 #endif
1530 }
1531 
1532 
1533 static void
1534 put_diff (struct ConsensusSession *session,
1535  struct DiffEntry *diff)
1536 {
1537  struct GNUNET_HashCode hash;
1538 
1539  GNUNET_assert (NULL != diff);
1540 
1541  GNUNET_CRYPTO_hash (&diff->key, sizeof(struct DiffKey), &hash);
1543  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash,
1544  diff,
1546 }
1547 
1548 
1549 static void
1550 put_set (struct ConsensusSession *session,
1551  struct SetEntry *set)
1552 {
1553  struct GNUNET_HashCode hash;
1554 
1555  GNUNET_assert (NULL != set->h);
1556 
1558  "Putting set %s\n",
1559  debug_str_set_key (&set->key));
1560 
1561  GNUNET_CRYPTO_hash (&set->key, sizeof(struct SetKey), &hash);
1563  GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1565 }
1566 
1567 
1568 static void
1569 put_rfn (struct ConsensusSession *session,
1570  struct ReferendumEntry *rfn)
1571 {
1572  struct GNUNET_HashCode hash;
1573 
1574  GNUNET_CRYPTO_hash (&rfn->key, sizeof(struct RfnKey), &hash);
1576  GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1578 }
1579 
1580 
1581 static void
1583 {
1584  /* not implemented yet */
1585  GNUNET_assert (0);
1586 }
1587 
1588 
1589 static void
1591  struct ReferendumEntry *rfn,
1592  uint16_t voting_peer,
1593  uint16_t num_peers)
1594 {
1596  struct DiffElementInfo *di;
1597 
1599 
1600  while (GNUNET_YES ==
1602  NULL,
1603  (const void **) &di))
1604  {
1605  if (di->weight > 0)
1606  {
1607  rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1608  }
1609  if (di->weight < 0)
1610  {
1611  rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1612  }
1613  }
1614 
1616 }
1617 
1618 
1619 struct DiffEntry *
1621 {
1622  struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1623 
1625 
1626  return d;
1627 }
1628 
1629 
1630 struct DiffEntry *
1631 diff_compose (struct DiffEntry *diff_1,
1632  struct DiffEntry *diff_2)
1633 {
1634  struct DiffEntry *diff_new;
1636  struct DiffElementInfo *di;
1637 
1638  diff_new = diff_create ();
1639 
1642  (const
1643  void **) &
1644  di))
1645  {
1646  diff_insert (diff_new, di->weight, di->element);
1647  }
1649 
1652  (const
1653  void **) &
1654  di))
1655  {
1656  diff_insert (diff_new, di->weight, di->element);
1657  }
1659 
1660  return diff_new;
1661 }
1662 
1663 
1664 struct ReferendumEntry *
1665 rfn_create (uint16_t size)
1666 {
1667  struct ReferendumEntry *rfn;
1668 
1669  rfn = GNUNET_new (struct ReferendumEntry);
1671  rfn->peer_commited = GNUNET_new_array (size, int);
1672  rfn->peer_contested = GNUNET_new_array (size, int);
1673  rfn->num_peers = size;
1674 
1675  return rfn;
1676 }
1677 
1678 
1679 #if UNUSED
1680 static void
1681 diff_destroy (struct DiffEntry *diff)
1682 {
1684  GNUNET_free (diff);
1685 }
1686 
1687 
1688 #endif
1689 
1690 
1696 static void
1697 rfn_majority (const struct ReferendumEntry *rfn,
1698  const struct RfnElementInfo *ri,
1699  uint16_t *ret_majority,
1700  enum ReferendumVote *ret_vote)
1701 {
1702  uint16_t votes_yes = 0;
1703  uint16_t num_commited = 0;
1704  uint16_t i;
1705 
1707  "Computing rfn majority for element %s of rfn {%s}\n",
1708  debug_str_element (ri->element),
1709  debug_str_rfn_key (&rfn->key));
1710 
1711  for (i = 0; i < rfn->num_peers; i++)
1712  {
1713  if (GNUNET_NO == rfn->peer_commited[i])
1714  continue;
1715  num_commited++;
1716 
1717  if (GNUNET_YES == ri->votes[i])
1718  votes_yes++;
1719  }
1720 
1721  if (votes_yes > (num_commited) / 2)
1722  {
1723  *ret_vote = ri->proposal;
1724  *ret_majority = votes_yes;
1725  }
1726  else
1727  {
1728  *ret_vote = VOTE_STAY;
1729  *ret_majority = num_commited - votes_yes;
1730  }
1731 }
1732 
1733 
1735 {
1736  struct TaskEntry *task;
1737  struct SetKey dst_set_key;
1738 };
1739 
1740 
1741 static void
1742 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1743 {
1744  struct SetCopyCls *scc = cls;
1745  struct TaskEntry *task = scc->task;
1746  struct SetKey dst_set_key = scc->dst_set_key;
1747  struct SetEntry *set;
1748  struct SetHandle *sh = GNUNET_new (struct SetHandle);
1749 
1750  sh->h = copy;
1752  task->step->session->set_handles_tail,
1753  sh);
1754 
1755  GNUNET_free (scc);
1756  set = GNUNET_new (struct SetEntry);
1757  set->h = copy;
1758  set->key = dst_set_key;
1759  put_set (task->step->session, set);
1760 
1761  task->start (task);
1762 }
1763 
1764 
1769 static void
1771  struct SetKey *src_set_key,
1772  struct SetKey *dst_set_key)
1773 {
1774  struct SetEntry *src_set;
1775  struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1776 
1778  "Copying set {%s} to {%s} for task {%s}\n",
1779  debug_str_set_key (src_set_key),
1780  debug_str_set_key (dst_set_key),
1781  debug_str_task_key (&task->key));
1782 
1783  scc->task = task;
1784  scc->dst_set_key = *dst_set_key;
1785  src_set = lookup_set (task->step->session, src_set_key);
1786  GNUNET_assert (NULL != src_set);
1787  GNUNET_SET_copy_lazy (src_set->h,
1788  set_copy_cb,
1789  scc);
1790 }
1791 
1792 
1794 {
1799  struct TaskEntry *task;
1800 };
1801 
1802 
1803 static void
1805 {
1806  struct SetMutationProgressCls *pc = cls;
1807 
1808  GNUNET_assert (pc->num_pending > 0);
1809 
1810  pc->num_pending--;
1811 
1812  if (0 == pc->num_pending)
1813  {
1814  struct TaskEntry *task = pc->task;
1815  GNUNET_free (pc);
1816  finish_task (task);
1817  }
1818 }
1819 
1820 
1821 static void
1823 {
1824  unsigned int i;
1825 
1826  if (GNUNET_YES == step->is_running)
1827  return;
1828  if (GNUNET_YES == step->is_finished)
1829  return;
1830  if (GNUNET_NO == step->early_finishable)
1831  return;
1832 
1833  step->is_finished = GNUNET_YES;
1834 
1835 #ifdef GNUNET_EXTRA_LOGGING
1837  "Finishing step `%s' early.\n",
1838  step->debug_name);
1839 #endif
1840 
1841  for (i = 0; i < step->subordinates_len; i++)
1842  {
1843  GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1844  step->subordinates[i]->pending_prereq--;
1845 #ifdef GNUNET_EXTRA_LOGGING
1847  "Decreased pending_prereq to %u for step `%s'.\n",
1848  (unsigned int) step->subordinates[i]->pending_prereq,
1849  step->subordinates[i]->debug_name);
1850 #endif
1852  }
1853 
1854  // XXX: maybe schedule as task to avoid recursion?
1855  run_ready_steps (step->session);
1856 }
1857 
1858 
1859 static void
1861 {
1862  unsigned int i;
1863 
1864  GNUNET_assert (step->finished_tasks == step->tasks_len);
1865  GNUNET_assert (GNUNET_YES == step->is_running);
1866  GNUNET_assert (GNUNET_NO == step->is_finished);
1867 
1868 #ifdef GNUNET_EXTRA_LOGGING
1870  "All tasks of step `%s' with %u subordinates finished.\n",
1871  step->debug_name,
1872  step->subordinates_len);
1873 #endif
1874 
1875  for (i = 0; i < step->subordinates_len; i++)
1876  {
1877  GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1878  step->subordinates[i]->pending_prereq--;
1879 #ifdef GNUNET_EXTRA_LOGGING
1881  "Decreased pending_prereq to %u for step `%s'.\n",
1882  (unsigned int) step->subordinates[i]->pending_prereq,
1883  step->subordinates[i]->debug_name);
1884 #endif
1885  }
1886 
1887  step->is_finished = GNUNET_YES;
1888 
1889  // XXX: maybe schedule as task to avoid recursion?
1890  run_ready_steps (step->session);
1891 }
1892 
1893 
1900 static void
1902 {
1903  struct ConsensusSession *session = task->step->session;
1904  struct SetKey sk_in;
1905  struct SetKey sk_out;
1906  struct RfnKey rk_in;
1907  struct SetEntry *set_out;
1908  struct ReferendumEntry *rfn_in;
1910  struct RfnElementInfo *ri;
1911  struct SetMutationProgressCls *progress_cls;
1912  uint16_t worst_majority = UINT16_MAX;
1913 
1914  sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1915  rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1916  sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1917 
1918  set_out = lookup_set (session, &sk_out);
1919  if (NULL == set_out)
1920  {
1921  create_set_copy_for_task (task, &sk_in, &sk_out);
1922  return;
1923  }
1924 
1925  rfn_in = lookup_rfn (session, &rk_in);
1926  GNUNET_assert (NULL != rfn_in);
1927 
1928  progress_cls = GNUNET_new (struct SetMutationProgressCls);
1929  progress_cls->task = task;
1930 
1932 
1933  while (GNUNET_YES ==
1935  NULL,
1936  (const void **) &ri))
1937  {
1938  uint16_t majority_num;
1939  enum ReferendumVote majority_vote;
1940 
1941  rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1942 
1943  if (worst_majority > majority_num)
1944  worst_majority = majority_num;
1945 
1946  switch (majority_vote)
1947  {
1948  case VOTE_ADD:
1949  progress_cls->num_pending++;
1951  GNUNET_SET_add_element (set_out->h,
1952  ri->element,
1954  progress_cls));
1956  "P%u: apply round: adding element %s with %u-majority.\n",
1957  session->local_peer_idx,
1958  debug_str_element (ri->element), majority_num);
1959  break;
1960 
1961  case VOTE_REMOVE:
1962  progress_cls->num_pending++;
1964  GNUNET_SET_remove_element (set_out->h,
1965  ri->element,
1967  progress_cls));
1969  "P%u: apply round: deleting element %s with %u-majority.\n",
1970  session->local_peer_idx,
1971  debug_str_element (ri->element), majority_num);
1972  break;
1973 
1974  case VOTE_STAY:
1976  "P%u: apply round: keeping element %s with %u-majority.\n",
1977  session->local_peer_idx,
1978  debug_str_element (ri->element), majority_num);
1979  // do nothing
1980  break;
1981 
1982  default:
1983  GNUNET_assert (0);
1984  break;
1985  }
1986  }
1987 
1988  if (0 == progress_cls->num_pending)
1989  {
1990  // call closure right now, no pending ops
1991  GNUNET_free (progress_cls);
1992  finish_task (task);
1993  }
1994 
1995  {
1996  uint16_t thresh = (session->num_peers / 3) * 2;
1997 
1998  if (worst_majority >= thresh)
1999  {
2000  switch (session->early_stopping)
2001  {
2002  case EARLY_STOPPING_NONE:
2005  "P%u: Stopping early (after one more superround)\n",
2006  session->local_peer_idx);
2007  break;
2008 
2011  "P%u: finishing steps due to early finish\n",
2012  session->local_peer_idx);
2014  {
2015  struct Step *step;
2016  for (step = session->steps_head; NULL != step; step = step->next)
2017  try_finish_step_early (step);
2018  }
2019  break;
2020 
2021  case EARLY_STOPPING_DONE:
2022  /* We shouldn't be here anymore after early stopping */
2023  GNUNET_break (0);
2024  break;
2025 
2026  default:
2027  GNUNET_assert (0);
2028  break;
2029  }
2030  }
2031  else if (EARLY_STOPPING_NONE != session->early_stopping)
2032  {
2033  // Our assumption about the number of bad peers
2034  // has been broken.
2035  GNUNET_break_op (0);
2036  }
2037  else
2038  {
2040  "P%u: NOT finishing early (majority not good enough)\n",
2041  session->local_peer_idx);
2042  }
2043  }
2045 }
2046 
2047 
2048 static void
2050 {
2051  struct ConsensusSession *session = task->step->session;
2052  struct ReferendumEntry *output_rfn;
2053  struct ReferendumEntry *input_rfn;
2054  struct DiffEntry *input_diff;
2055  struct RfnKey rfn_key;
2056  struct DiffKey diff_key;
2058  struct RfnElementInfo *ri;
2059  unsigned int gradecast_confidence = 2;
2060 
2061  rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
2062  output_rfn = lookup_rfn (session, &rfn_key);
2063  if (NULL == output_rfn)
2064  {
2065  output_rfn = rfn_create (session->num_peers);
2066  output_rfn->key = rfn_key;
2067  put_rfn (session, output_rfn);
2068  }
2069 
2070  diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition,
2071  task->key.leader };
2072  input_diff = lookup_diff (session, &diff_key);
2073  GNUNET_assert (NULL != input_diff);
2074 
2075  rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition,
2076  task->key.leader };
2077  input_rfn = lookup_rfn (session, &rfn_key);
2078  GNUNET_assert (NULL != input_rfn);
2079 
2081  input_rfn->rfn_elements);
2082 
2083  apply_diff_to_rfn (input_diff, output_rfn, task->key.leader,
2084  session->num_peers);
2085 
2086  while (GNUNET_YES ==
2088  NULL,
2089  (const void **) &ri))
2090  {
2091  uint16_t majority_num;
2092  enum ReferendumVote majority_vote;
2093 
2094  // XXX: we need contested votes and non-contested votes here
2095  rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2096 
2097  if (majority_num <= session->num_peers / 3)
2098  majority_vote = VOTE_REMOVE;
2099 
2100  switch (majority_vote)
2101  {
2102  case VOTE_STAY:
2103  break;
2104 
2105  case VOTE_ADD:
2106  rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2107  break;
2108 
2109  case VOTE_REMOVE:
2110  rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2111  break;
2112 
2113  default:
2114  GNUNET_assert (0);
2115  break;
2116  }
2117  }
2119 
2120  {
2121  uint16_t noncontested;
2122  noncontested = rfn_noncontested (input_rfn);
2123  if (noncontested < (session->num_peers / 3) * 2)
2124  {
2125  gradecast_confidence = GNUNET_MIN (1, gradecast_confidence);
2126  }
2127  if (noncontested < (session->num_peers / 3) + 1)
2128  {
2129  gradecast_confidence = 0;
2130  }
2131  }
2132 
2133  if (gradecast_confidence >= 1)
2134  rfn_commit (output_rfn, task->key.leader);
2135 
2136  if (gradecast_confidence <= 1)
2137  session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2138 
2139  finish_task (task);
2140 }
2141 
2142 
2143 static void
2145 {
2146  struct SetEntry *input;
2147  struct SetOpCls *setop = &task->cls.setop;
2148  struct ConsensusSession *session = task->step->session;
2149 
2150  input = lookup_set (session, &setop->input_set);
2151  GNUNET_assert (NULL != input);
2152  GNUNET_assert (NULL != input->h);
2153 
2154  /* We create the outputs for the operation here
2155  (rather than in the set operation callback)
2156  because we want something valid in there, even
2157  if the other peer doesn't talk to us */
2158 
2159  if (SET_KIND_NONE != setop->output_set.set_kind)
2160  {
2161  /* If we don't have an existing output set,
2162  we clone the input set. */
2163  if (NULL == lookup_set (session, &setop->output_set))
2164  {
2165  create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
2166  return;
2167  }
2168  }
2169 
2170  if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2171  {
2172  if (NULL == lookup_rfn (session, &setop->output_rfn))
2173  {
2174  struct ReferendumEntry *rfn;
2175 
2177  "P%u: output rfn <%s> missing, creating.\n",
2178  session->local_peer_idx,
2179  debug_str_rfn_key (&setop->output_rfn));
2180 
2181  rfn = rfn_create (session->num_peers);
2182  rfn->key = setop->output_rfn;
2183  put_rfn (session, rfn);
2184  }
2185  }
2186 
2187  if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2188  {
2189  if (NULL == lookup_diff (session, &setop->output_diff))
2190  {
2191  struct DiffEntry *diff;
2192 
2193  diff = diff_create ();
2194  diff->key = setop->output_diff;
2195  put_diff (session, diff);
2196  }
2197  }
2198 
2199  if ((task->key.peer1 == session->local_peer_idx) && (task->key.peer2 ==
2200  session->local_peer_idx))
2201  {
2202  /* XXX: mark the corresponding rfn as commited if necessary */
2203  finish_task (task);
2204  return;
2205  }
2206 
2207  if (task->key.peer1 == session->local_peer_idx)
2208  {
2210 
2212  "P%u: Looking up set {%s} to run remote union\n",
2213  session->local_peer_idx,
2214  debug_str_set_key (&setop->input_set));
2215 
2217  rcm.header.size = htons (sizeof(struct
2219 
2220  rcm.kind = htons (task->key.kind);
2221  rcm.peer1 = htons (task->key.peer1);
2222  rcm.peer2 = htons (task->key.peer2);
2223  rcm.leader = htons (task->key.leader);
2224  rcm.repetition = htons (task->key.repetition);
2225  rcm.is_contested = htons (0);
2226 
2227  GNUNET_assert (NULL == setop->op);
2229  "P%u: initiating set op with P%u, our set is %s\n",
2230  session->local_peer_idx, task->key.peer2, debug_str_set_key (
2231  &setop->input_set));
2232 
2233  struct GNUNET_SET_Option opts[] = {
2234  { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2236  };
2237 
2238  // XXX: maybe this should be done while
2239  // setting up tasks alreays?
2240  setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2241  &session->global_id,
2242  &rcm.header,
2244  opts,
2245  set_result_cb,
2246  task);
2247 
2248  commit_set (session, task);
2249  }
2250  else if (task->key.peer2 == session->local_peer_idx)
2251  {
2252  /* Wait for the other peer to contact us */
2253  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2254  session->local_peer_idx, task->key.peer1);
2255 
2256  if (NULL != setop->op)
2257  {
2258  commit_set (session, task);
2259  }
2260  }
2261  else
2262  {
2263  /* We made an error while constructing the task graph. */
2264  GNUNET_assert (0);
2265  }
2266 }
2267 
2268 
2269 static void
2271 {
2273  struct ReferendumEntry *input_rfn;
2274  struct RfnElementInfo *ri;
2275  struct SetEntry *output_set;
2276  struct SetMutationProgressCls *progress_cls;
2277  struct ConsensusSession *session = task->step->session;
2278  struct SetKey sk_in;
2279  struct SetKey sk_out;
2280  struct RfnKey rk_in;
2281 
2282  sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition,
2283  task->key.leader };
2284  sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition,
2285  task->key.leader };
2286  output_set = lookup_set (session, &sk_out);
2287  if (NULL == output_set)
2288  {
2289  create_set_copy_for_task (task, &sk_in, &sk_out);
2290  return;
2291  }
2292 
2293 
2294  {
2295  // FIXME: should be marked as a shallow copy, so
2296  // we can destroy everything correctly
2297  struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2298  last_set->h = output_set->h;
2299  last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2300  put_set (session, last_set);
2301  }
2302 
2304  "Evaluating referendum in Task {%s}\n",
2305  debug_str_task_key (&task->key));
2306 
2307  progress_cls = GNUNET_new (struct SetMutationProgressCls);
2308  progress_cls->task = task;
2309 
2310  rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition,
2311  task->key.leader };
2312  input_rfn = lookup_rfn (session, &rk_in);
2313 
2314  GNUNET_assert (NULL != input_rfn);
2315 
2317  input_rfn->rfn_elements);
2318  GNUNET_assert (NULL != iter);
2319 
2320  while (GNUNET_YES ==
2322  NULL,
2323  (const void **) &ri))
2324  {
2325  enum ReferendumVote majority_vote;
2326  uint16_t majority_num;
2327 
2328  rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2329 
2330  if (majority_num < session->num_peers / 3)
2331  {
2332  /* It is not the case that all nonfaulty peers
2333  echoed the same value. Since we're doing a set reconciliation, we
2334  can't simply send "nothing" for the value. Thus we mark our 'confirm'
2335  reconciliation as contested. Other peers might not know that the
2336  leader is faulty, thus we still re-distribute in the confirmation
2337  round. */output_set->is_contested = GNUNET_YES;
2338  }
2339 
2340  switch (majority_vote)
2341  {
2342  case VOTE_ADD:
2343  progress_cls->num_pending++;
2345  GNUNET_SET_add_element (output_set->h,
2346  ri->element,
2348  progress_cls));
2349  break;
2350 
2351  case VOTE_REMOVE:
2352  progress_cls->num_pending++;
2354  GNUNET_SET_remove_element (output_set->h,
2355  ri->element,
2357  progress_cls));
2358  break;
2359 
2360  case VOTE_STAY:
2361  /* Nothing to do. */
2362  break;
2363 
2364  default:
2365  /* not reached */
2366  GNUNET_assert (0);
2367  }
2368  }
2369 
2371 
2372  if (0 == progress_cls->num_pending)
2373  {
2374  // call closure right now, no pending ops
2375  GNUNET_free (progress_cls);
2376  finish_task (task);
2377  }
2378 }
2379 
2380 
2381 static void
2383 {
2384  struct SetEntry *final_set;
2385  struct ConsensusSession *session = task->step->session;
2386 
2387  final_set = lookup_set (session, &task->cls.finish.input_set);
2388 
2389  GNUNET_assert (NULL != final_set);
2390 
2391 
2392  GNUNET_SET_iterate (final_set->h,
2394  task);
2395 }
2396 
2397 
2398 static void
2399 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2400 {
2401  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n",
2402  session->local_peer_idx, debug_str_task_key (&task->key));
2403 
2404  GNUNET_assert (GNUNET_NO == task->is_started);
2405  GNUNET_assert (GNUNET_NO == task->is_finished);
2406  GNUNET_assert (NULL != task->start);
2407 
2408  task->start (task);
2409 
2410  task->is_started = GNUNET_YES;
2411 }
2412 
2413 
2414 /*
2415  * Run all steps of the session that don't any
2416  * more dependencies.
2417  */
2418 static void
2420 {
2421  struct Step *step;
2422 
2423  step = session->steps_head;
2424 
2425  while (NULL != step)
2426  {
2427  if ((GNUNET_NO == step->is_running) && (0 == step->pending_prereq) &&
2428  (GNUNET_NO == step->is_finished))
2429  {
2430  size_t i;
2431 
2432  GNUNET_assert (0 == step->finished_tasks);
2433 
2434 #ifdef GNUNET_EXTRA_LOGGING
2436  "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2437  session->local_peer_idx,
2438  step->debug_name,
2439  step->round, step->tasks_len, step->subordinates_len);
2440 #endif
2441 
2442  step->is_running = GNUNET_YES;
2443  for (i = 0; i < step->tasks_len; i++)
2444  start_task (session, step->tasks[i]);
2445 
2446  /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2447  if ((step->finished_tasks == step->tasks_len) && (GNUNET_NO ==
2448  step->is_finished))
2449  finish_step (step);
2450 
2451  /* Running the next ready steps will be triggered by task completion */
2452  return;
2453  }
2454  step = step->next;
2455  }
2456 
2457  return;
2458 }
2459 
2460 
2461 static void
2462 finish_task (struct TaskEntry *task)
2463 {
2464  GNUNET_assert (GNUNET_NO == task->is_finished);
2465  task->is_finished = GNUNET_YES;
2466 
2467  task->step->finished_tasks++;
2468 
2470  "P%u: Finishing Task {%s} (now %u/%u tasks finished in step)\n",
2471  task->step->session->local_peer_idx,
2472  debug_str_task_key (&task->key),
2473  (unsigned int) task->step->finished_tasks,
2474  (unsigned int) task->step->tasks_len);
2475 
2476  if (task->step->finished_tasks == task->step->tasks_len)
2477  finish_step (task->step);
2478 }
2479 
2480 
2488 static int
2489 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct
2490  ConsensusSession *session)
2491 {
2492  int i;
2493 
2494  for (i = 0; i < session->num_peers; i++)
2495  if (0 == GNUNET_memcmp (peer, &session->peers[i]))
2496  return i;
2497  return -1;
2498 }
2499 
2500 
2510 static void
2512  const struct GNUNET_HashCode *local_session_id)
2513 {
2514  const char *salt = "gnunet-service-consensus/session_id";
2515 
2517  GNUNET_CRYPTO_kdf (&session->global_id,
2518  sizeof(struct GNUNET_HashCode),
2519  salt,
2520  strlen (salt),
2521  session->peers,
2522  session->num_peers * sizeof(struct
2524  local_session_id,
2525  sizeof(struct GNUNET_HashCode),
2526  NULL));
2527 }
2528 
2529 
2537 static int
2538 peer_id_cmp (const void *h1, const void *h2)
2539 {
2540  return memcmp (h1, h2, sizeof(struct GNUNET_PeerIdentity));
2541 }
2542 
2543 
2551 static void
2553  const struct
2554  GNUNET_CONSENSUS_JoinMessage *join_msg)
2555 {
2556  const struct GNUNET_PeerIdentity *msg_peers
2557  = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2558  int local_peer_in_list;
2559 
2560  session->num_peers = ntohl (join_msg->num_peers);
2561 
2562  /* Peers in the join message, may or may not include the local peer,
2563  Add it if it is missing. */
2564  local_peer_in_list = GNUNET_NO;
2565  for (unsigned int i = 0; i < session->num_peers; i++)
2566  {
2567  if (0 == GNUNET_memcmp (&msg_peers[i],
2568  &my_peer))
2569  {
2570  local_peer_in_list = GNUNET_YES;
2571  break;
2572  }
2573  }
2574  if (GNUNET_NO == local_peer_in_list)
2575  session->num_peers++;
2576 
2577  session->peers = GNUNET_new_array (session->num_peers,
2578  struct GNUNET_PeerIdentity);
2579  if (GNUNET_NO == local_peer_in_list)
2580  session->peers[session->num_peers - 1] = my_peer;
2581 
2582  GNUNET_memcpy (session->peers,
2583  msg_peers,
2584  ntohl (join_msg->num_peers) * sizeof(struct
2586  qsort (session->peers,
2587  session->num_peers,
2588  sizeof(struct GNUNET_PeerIdentity),
2589  &peer_id_cmp);
2590 }
2591 
2592 
2593 static struct TaskEntry *
2595  struct TaskKey *key)
2596 {
2597  struct GNUNET_HashCode hash;
2598 
2599 
2600  GNUNET_CRYPTO_hash (key, sizeof(struct TaskKey), &hash);
2601  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2602  GNUNET_h2s (&hash));
2603  return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2604 }
2605 
2606 
2622 static void
2623 set_listen_cb (void *cls,
2624  const struct GNUNET_PeerIdentity *other_peer,
2625  const struct GNUNET_MessageHeader *context_msg,
2626  struct GNUNET_SET_Request *request)
2627 {
2628  struct ConsensusSession *session = cls;
2629  struct TaskKey tk;
2630  struct TaskEntry *task;
2632 
2633  if (NULL == context_msg)
2634  {
2635  GNUNET_break_op (0);
2636  return;
2637  }
2638 
2640  context_msg->type))
2641  {
2642  GNUNET_break_op (0);
2643  return;
2644  }
2645 
2646  if (sizeof(struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (
2647  context_msg->size))
2648  {
2649  GNUNET_break_op (0);
2650  return;
2651  }
2652 
2653  cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2654 
2655  tk = ((struct TaskKey) {
2656  .kind = ntohs (cm->kind),
2657  .peer1 = ntohs (cm->peer1),
2658  .peer2 = ntohs (cm->peer2),
2659  .repetition = ntohs (cm->repetition),
2660  .leader = ntohs (cm->leader),
2661  });
2662 
2663  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2664  session->local_peer_idx, debug_str_task_key (&tk));
2665 
2666  task = lookup_task (session, &tk);
2667 
2668  if (NULL == task)
2669  {
2670  GNUNET_break_op (0);
2671  return;
2672  }
2673 
2674  if (GNUNET_YES == task->is_finished)
2675  {
2676  GNUNET_break_op (0);
2677  return;
2678  }
2679 
2680  if (task->key.peer2 != session->local_peer_idx)
2681  {
2682  /* We're being asked, so we must be thne 2nd peer. */
2683  GNUNET_break_op (0);
2684  return;
2685  }
2686 
2687  GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2688  (task->key.peer2 == session->local_peer_idx)));
2689 
2690  struct GNUNET_SET_Option opts[] = {
2691  { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2693  };
2694 
2695  task->cls.setop.op = GNUNET_SET_accept (request,
2697  opts,
2698  set_result_cb,
2699  task);
2700 
2701  /* If the task hasn't been started yet,
2702  we wait for that until we commit. */
2703 
2704  if (GNUNET_YES == task->is_started)
2705  {
2706  commit_set (session, task);
2707  }
2708 }
2709 
2710 
2711 static void
2713  struct TaskEntry *t)
2714 {
2715  struct GNUNET_HashCode round_hash;
2716  struct Step *s;
2717 
2718  GNUNET_assert (NULL != t->step);
2719 
2720  t = GNUNET_memdup (t, sizeof(struct TaskEntry));
2721 
2722  s = t->step;
2723 
2724  if (s->tasks_len == s->tasks_cap)
2725  {
2726  unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2728  s->tasks_cap,
2729  target_size);
2730  }
2731 
2732 #ifdef GNUNET_EXTRA_LOGGING
2733  GNUNET_assert (NULL != s->debug_name);
2734  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2735  debug_str_task_key (&t->key),
2736  s->debug_name);
2737 #endif
2738 
2739  s->tasks[s->tasks_len] = t;
2740  s->tasks_len++;
2741 
2742  GNUNET_CRYPTO_hash (&t->key, sizeof(struct TaskKey), &round_hash);
2744  GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2746 }
2747 
2748 
2749 static void
2751 {
2752  /* Given the fully constructed task graph
2753  with rounds for tasks, we can give the tasks timeouts. */
2754 
2755  // unsigned int max_round;
2756 
2757  /* XXX: implement! */
2758 }
2759 
2760 
2761 /*
2762  * Arrange two peers in some canonical order.
2763  */
2764 static void
2765 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2766 {
2767  uint16_t a;
2768  uint16_t b;
2769 
2770  GNUNET_assert (*p1 < n);
2771  GNUNET_assert (*p2 < n);
2772 
2773  if (*p1 < *p2)
2774  {
2775  a = *p1;
2776  b = *p2;
2777  }
2778  else
2779  {
2780  a = *p2;
2781  b = *p1;
2782  }
2783 
2784  /* For uniformly random *p1, *p2,
2785  this condition is true with 50% chance */
2786  if (((b - a) + n) % n <= n / 2)
2787  {
2788  *p1 = a;
2789  *p2 = b;
2790  }
2791  else
2792  {
2793  *p1 = b;
2794  *p2 = a;
2795  }
2796 }
2797 
2798 
2802 static void
2803 step_depend_on (struct Step *step, struct Step *dep)
2804 {
2805  /* We're not checking for cyclic dependencies,
2806  but this is a cheap sanity check. */
2807  GNUNET_assert (step != dep);
2808  GNUNET_assert (NULL != step);
2809  GNUNET_assert (NULL != dep);
2810  GNUNET_assert (dep->round <= step->round);
2811 
2812 #ifdef GNUNET_EXTRA_LOGGING
2813  /* Make sure we have complete debugging information.
2814  Also checks that we don't screw up too badly
2815  constructing the task graph. */
2816  GNUNET_assert (NULL != step->debug_name);
2817  GNUNET_assert (NULL != dep->debug_name);
2819  "Making step `%s' depend on `%s'\n",
2820  step->debug_name,
2821  dep->debug_name);
2822 #endif
2823 
2824  if (dep->subordinates_cap == dep->subordinates_len)
2825  {
2826  unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2828  dep->subordinates_cap,
2829  target_size);
2830  }
2831 
2833 
2834  dep->subordinates[dep->subordinates_len] = step;
2835  dep->subordinates_len++;
2836 
2837  step->pending_prereq++;
2838 }
2839 
2840 
2841 static struct Step *
2843 {
2844  struct Step *step;
2845 
2846  step = GNUNET_new (struct Step);
2847  step->session = session;
2848  step->round = round;
2851  session->steps_tail,
2852  step);
2853  return step;
2854 }
2855 
2856 
2861 static void
2863  uint16_t rep,
2864  uint16_t lead,
2865  struct Step *step_before,
2866  struct Step *step_after)
2867 {
2868  uint16_t n = session->num_peers;
2869  uint16_t me = session->local_peer_idx;
2870 
2871  uint16_t p1;
2872  uint16_t p2;
2873 
2874  /* The task we're currently setting up. */
2875  struct TaskEntry task;
2876 
2877  struct Step *step;
2878  struct Step *prev_step;
2879 
2880  uint16_t round;
2881 
2882  unsigned int k;
2883 
2884  round = step_before->round + 1;
2885 
2886  /* gcast step 1: leader disseminates */
2887 
2888  step = create_step (session, round, GNUNET_YES);
2889 
2890 #ifdef GNUNET_EXTRA_LOGGING
2891  GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead,
2892  rep);
2893 #endif
2894  step_depend_on (step, step_before);
2895 
2896  if (lead == me)
2897  {
2898  for (k = 0; k < n; k++)
2899  {
2900  if (k == me)
2901  continue;
2902  p1 = me;
2903  p2 = k;
2904  arrange_peers (&p1, &p2, n);
2905  task = ((struct TaskEntry) {
2906  .step = step,
2907  .start = task_start_reconcile,
2908  .cancel = task_cancel_reconcile,
2909  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep,
2910  me },
2911  });
2912  task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2913  put_task (session->taskmap, &task);
2914  }
2915  /* We run this task to make sure that the leader
2916  has the stored the SET_KIND_LEADER set of himself,
2917  so it can participate in the rest of the gradecast
2918  without the code having to handle any special cases. */
2919  task = ((struct TaskEntry) {
2920  .step = step,
2921  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2922  .start = task_start_reconcile,
2923  .cancel = task_cancel_reconcile,
2924  });
2925  task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2926  task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep,
2927  me };
2929  rep, me };
2930  put_task (session->taskmap, &task);
2931  }
2932  else
2933  {
2934  p1 = me;
2935  p2 = lead;
2936  arrange_peers (&p1, &p2, n);
2937  task = ((struct TaskEntry) {
2938  .step = step,
2939  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep,
2940  lead },
2941  .start = task_start_reconcile,
2942  .cancel = task_cancel_reconcile,
2943  });
2944  task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2945  task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep,
2946  lead };
2948  rep, lead };
2949  put_task (session->taskmap, &task);
2950  }
2951 
2952  /* gcast phase 2: echo */
2953  prev_step = step;
2954  round += 1;
2955  step = create_step (session, round, GNUNET_YES);
2956 #ifdef GNUNET_EXTRA_LOGGING
2957  GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2958 #endif
2959  step_depend_on (step, prev_step);
2960 
2961  for (k = 0; k < n; k++)
2962  {
2963  p1 = k;
2964  p2 = me;
2965  arrange_peers (&p1, &p2, n);
2966  task = ((struct TaskEntry) {
2967  .step = step,
2968  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2969  .start = task_start_reconcile,
2970  .cancel = task_cancel_reconcile,
2971  });
2972  task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep,
2973  lead };
2974  task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2975  put_task (session->taskmap, &task);
2976  }
2977 
2978  prev_step = step;
2979  /* Same round, since step only has local tasks */
2980  step = create_step (session, round, GNUNET_YES);
2981 #ifdef GNUNET_EXTRA_LOGGING
2982  GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2983 #endif
2984  step_depend_on (step, prev_step);
2985 
2986  arrange_peers (&p1, &p2, n);
2987  task = ((struct TaskEntry) {
2988  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep,
2989  lead },
2990  .step = step,
2991  .start = task_start_eval_echo
2992  });
2993  put_task (session->taskmap, &task);
2994 
2995  prev_step = step;
2996  round += 1;
2997  step = create_step (session, round, GNUNET_YES);
2998 #ifdef GNUNET_EXTRA_LOGGING
2999  GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
3000 #endif
3001  step_depend_on (step, prev_step);
3002 
3003  /* gcast phase 3: confirmation and grading */
3004  for (k = 0; k < n; k++)
3005  {
3006  p1 = k;
3007  p2 = me;
3008  arrange_peers (&p1, &p2, n);
3009  task = ((struct TaskEntry) {
3010  .step = step,
3011  .start = task_start_reconcile,
3012  .cancel = task_cancel_reconcile,
3013  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep,
3014  lead },
3015  });
3016  task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep,
3017  lead };
3018  task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
3019  /* If there was at least one element in the echo round that was
3020  contested (i.e. it had no n-t majority), then we let the other peers
3021  know, and other peers let us know. The contested flag for each peer is
3022  stored in the rfn. */
3024  put_task (session->taskmap, &task);
3025  }
3026 
3027  prev_step = step;
3028  /* Same round, since step only has local tasks */
3029  step = create_step (session, round, GNUNET_YES);
3030 #ifdef GNUNET_EXTRA_LOGGING
3031  GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead,
3032  rep);
3033 #endif
3034  step_depend_on (step, prev_step);
3035 
3036  task = ((struct TaskEntry) {
3037  .step = step,
3038  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep,
3039  lead },
3040  .start = task_start_grade,
3041  });
3042  put_task (session->taskmap, &task);
3043 
3044  step_depend_on (step_after, step);
3045 }
3046 
3047 
3048 static void
3050 {
3051  uint16_t n = session->num_peers;
3052  uint16_t t = n / 3;
3053 
3054  uint16_t me = session->local_peer_idx;
3055 
3056  /* The task we're currently setting up. */
3057  struct TaskEntry task;
3058 
3059  /* Current leader */
3060  unsigned int lead;
3061 
3062  struct Step *step;
3063  struct Step *prev_step;
3064 
3065  unsigned int round = 0;
3066 
3067  unsigned int i;
3068 
3069  // XXX: introduce first step,
3070  // where we wait for all insert acks
3071  // from the set service
3072 
3073  /* faster but brittle all-to-all */
3074 
3075  // XXX: Not implemented yet
3076 
3077  /* all-to-all step */
3078 
3079  step = create_step (session, round, GNUNET_NO);
3080 
3081 #ifdef GNUNET_EXTRA_LOGGING
3082  step->debug_name = GNUNET_strdup ("all to all");
3083 #endif
3084 
3085  for (i = 0; i < n; i++)
3086  {
3087  uint16_t p1;
3088  uint16_t p2;
3089 
3090  p1 = me;
3091  p2 = i;
3092  arrange_peers (&p1, &p2, n);
3093  task = ((struct TaskEntry) {
3094  .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
3095  .step = step,
3096  .start = task_start_reconcile,
3097  .cancel = task_cancel_reconcile,
3098  });
3099  task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3100  task.cls.setop.output_set = task.cls.setop.input_set;
3102  put_task (session->taskmap, &task);
3103  }
3104 
3105  round += 1;
3106  prev_step = step;
3107  step = create_step (session, round, GNUNET_NO);;
3108 #ifdef GNUNET_EXTRA_LOGGING
3109  step->debug_name = GNUNET_strdup ("all to all 2");
3110 #endif
3111  step_depend_on (step, prev_step);
3112 
3113 
3114  for (i = 0; i < n; i++)
3115  {
3116  uint16_t p1;
3117  uint16_t p2;
3118 
3119  p1 = me;
3120  p2 = i;
3121  arrange_peers (&p1, &p2, n);
3122  task = ((struct TaskEntry) {
3123  .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
3124  .step = step,
3125  .start = task_start_reconcile,
3126  .cancel = task_cancel_reconcile,
3127  });
3128  task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3129  task.cls.setop.output_set = task.cls.setop.input_set;
3131  put_task (session->taskmap, &task);
3132  }
3133 
3134  round += 1;
3135 
3136  prev_step = step;
3137  step = NULL;
3138 
3139 
3140  /* Byzantine union */
3141 
3142  /* sequential repetitions of the gradecasts */
3143  for (i = 0; i < t + 1; i++)
3144  {
3145  struct Step *step_rep_start;
3146  struct Step *step_rep_end;
3147 
3148  /* Every repetition is in a separate round. */
3149  step_rep_start = create_step (session, round, GNUNET_YES);
3150 #ifdef GNUNET_EXTRA_LOGGING
3151  GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3152 #endif
3153 
3154  step_depend_on (step_rep_start, prev_step);
3155 
3156  /* gradecast has three rounds */
3157  round += 3;
3158  step_rep_end = create_step (session, round, GNUNET_YES);
3159 #ifdef GNUNET_EXTRA_LOGGING
3160  GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3161 #endif
3162 
3163  /* parallel gradecasts */
3164  for (lead = 0; lead < n; lead++)
3165  construct_task_graph_gradecast (session, i, lead, step_rep_start,
3166  step_rep_end);
3167 
3168  task = ((struct TaskEntry) {
3169  .step = step_rep_end,
3170  .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1 },
3171  .start = task_start_apply_round,
3172  });
3173  put_task (session->taskmap, &task);
3174 
3175  prev_step = step_rep_end;
3176  }
3177 
3178  /* There is no next gradecast round, thus the final
3179  start step is the overall end step of the gradecasts */
3180  round += 1;
3181  step = create_step (session, round, GNUNET_NO);
3182 #ifdef GNUNET_EXTRA_LOGGING
3183  GNUNET_asprintf (&step->debug_name, "finish");
3184 #endif
3185  step_depend_on (step, prev_step);
3186 
3187  task = ((struct TaskEntry) {
3188  .step = step,
3189  .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3190  .start = task_start_finish,
3191  });
3192  task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3193 
3194  put_task (session->taskmap, &task);
3195 }
3196 
3197 
3205 static int
3207  const struct GNUNET_CONSENSUS_JoinMessage *m)
3208 {
3209  uint32_t listed_peers = ntohl (m->num_peers);
3210 
3211  if ((ntohs (m->header.size) - sizeof(*m)) !=
3212  listed_peers * sizeof(struct GNUNET_PeerIdentity))
3213  {
3214  GNUNET_break (0);
3215  return GNUNET_SYSERR;
3216  }
3217  return GNUNET_OK;
3218 }
3219 
3220 
3227 static void
3229  const struct GNUNET_CONSENSUS_JoinMessage *m)
3230 {
3231  struct ConsensusSession *session = cls;
3232  struct ConsensusSession *other_session;
3233 
3235  m);
3236  compute_global_id (session,
3237  &m->session_id);
3238 
3239  /* Check if some local client already owns the session.
3240  It is only legal to have a session with an existing global id
3241  if all other sessions with this global id are finished.*/
3242  for (other_session = sessions_head;
3243  NULL != other_session;
3244  other_session = other_session->next)
3245  {
3246  if ((other_session != session) &&
3247  (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3248  &other_session->global_id)))
3249  break;
3250  }
3251 
3252  session->conclude_deadline
3254  session->conclude_start
3256  session->local_peer_idx = get_peer_idx (&my_peer,
3257  session);
3258  GNUNET_assert (-1 != session->local_peer_idx);
3259 
3261  "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3262  GNUNET_h2s (&m->session_id),
3263  session->num_peers,
3264  session->local_peer_idx,
3267  session->conclude_deadline),
3268  GNUNET_YES));
3269 
3270  session->set_listener
3271  = GNUNET_SET_listen (cfg,
3273  &session->global_id,
3274  &set_listen_cb,
3275  session);
3276 
3278  GNUNET_NO);
3280  GNUNET_NO);
3282  GNUNET_NO);
3284  GNUNET_NO);
3285 
3286  {
3287  struct SetEntry *client_set;
3288 
3289  client_set = GNUNET_new (struct SetEntry);
3290  client_set->h = GNUNET_SET_create (cfg,
3292  struct SetHandle *sh = GNUNET_new (struct SetHandle);
3293  sh->h = client_set->h;
3295  session->set_handles_tail,
3296  sh);
3297  client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3298  put_set (session,
3299  client_set);
3300  }
3301 
3302  session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3303  int);
3304 
3305  /* Just construct the task graph,
3306  but don't run anything until the client calls conclude. */
3307  construct_task_graph (session);
3309 }
3310 
3311 
3312 static void
3314 {
3315  // FIXME: implement
3316 }
3317 
3318 
3326 static int
3328  const struct GNUNET_CONSENSUS_ElementMessage *msg)
3329 {
3330  return GNUNET_OK;
3331 }
3332 
3333 
3340 static void
3342  const struct GNUNET_CONSENSUS_ElementMessage *msg)
3343 {
3344  struct ConsensusSession *session = cls;
3345  ssize_t element_size;
3346  struct GNUNET_SET_Handle *initial_set;
3347  struct ConsensusElement *ce;
3348 
3349  if (GNUNET_YES == session->conclude_started)
3350  {
3351  GNUNET_break (0);
3353  return;
3354  }
3355 
3356  element_size = ntohs (msg->header.size) - sizeof(struct
3358  ce = GNUNET_malloc (sizeof(struct ConsensusElement) + element_size);
3359  GNUNET_memcpy (&ce[1], &msg[1], element_size);
3360  ce->payload_type = msg->element_type;
3361 
3362  struct GNUNET_SET_Element element = {
3364  .size = sizeof(struct ConsensusElement) + element_size,
3365  .data = ce,
3366  };
3367 
3368  {
3369  struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3370  struct SetEntry *entry;
3371 
3372  entry = lookup_set (session,
3373  &key);
3374  GNUNET_assert (NULL != entry);
3375  initial_set = entry->h;
3376  }
3377 
3378  session->num_client_insert_pending++;
3379  GNUNET_SET_add_element (initial_set,
3380  &element,
3382  session);
3383 
3384 #ifdef GNUNET_EXTRA_LOGGING
3385  {
3387  "P%u: element %s added\n",
3388  session->local_peer_idx,
3389  debug_str_element (&element));
3390  }
3391 #endif
3392  GNUNET_free (ce);
3394 }
3395 
3396 
3403 static void
3405  const struct GNUNET_MessageHeader *message)
3406 {
3407  struct ConsensusSession *session = cls;
3408 
3409  if (GNUNET_YES == session->conclude_started)
3410  {
3411  /* conclude started twice */
3412  GNUNET_break (0);
3414  return;
3415  }
3417  "conclude requested\n");
3418  session->conclude_started = GNUNET_YES;
3419  install_step_timeouts (session);
3420  run_ready_steps (session);
3422 }
3423 
3424 
3430 static void
3431 shutdown_task (void *cls)
3432 {
3434  "shutting down\n");
3435  GNUNET_STATISTICS_destroy (statistics,
3436  GNUNET_NO);
3437  statistics = NULL;
3438 }
3439 
3440 
3448 static void
3449 run (void *cls,
3450  const struct GNUNET_CONFIGURATION_Handle *c,
3452 {
3453  cfg = c;
3454  if (GNUNET_OK !=
3456  &my_peer))
3457  {
3459  "Could not retrieve host identity\n");
3461  return;
3462  }
3463  statistics = GNUNET_STATISTICS_create ("consensus",
3464  cfg);
3466  NULL);
3467 }
3468 
3469 
3478 static void *
3480  struct GNUNET_SERVICE_Client *c,
3481  struct GNUNET_MQ_Handle *mq)
3482 {
3483  struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3484 
3485  session->client = c;
3486  session->client_mq = mq;
3487  GNUNET_CONTAINER_DLL_insert (sessions_head,
3488  sessions_tail,
3489  session);
3490  return session;
3491 }
3492 
3493 
3501 static void
3503  struct GNUNET_SERVICE_Client *c,
3504  void *internal_cls)
3505 {
3506  struct ConsensusSession *session = internal_cls;
3507 
3508  if (NULL != session->set_listener)
3509  {
3511  session->set_listener = NULL;
3512  }
3513  GNUNET_CONTAINER_DLL_remove (sessions_head,
3514  sessions_tail,
3515  session);
3516 
3517  while (session->set_handles_head)
3518  {
3519  struct SetHandle *sh = session->set_handles_head;
3520  session->set_handles_head = sh->next;
3521  GNUNET_SET_destroy (sh->h);
3522  GNUNET_free (sh);
3523  }
3524  GNUNET_free (session);
3525 }
3526 
3527 
3532  ("consensus",
3534  &run,
3537  NULL,
3538  GNUNET_MQ_hd_fixed_size (client_conclude,
3540  struct GNUNET_MessageHeader,
3541  NULL),
3542  GNUNET_MQ_hd_var_size (client_insert,
3545  NULL),
3546  GNUNET_MQ_hd_var_size (client_join,
3549  NULL),
3551 
3552 /* end of gnunet-service-consensus.c */
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT
Abort a round, don&#39;t send requested elements anymore.
uint64_t first_size
Our set size from the first round.
Vote that an element should be removed.
struct FinishCls finish
void GNUNET_CONTAINER_multihashmap_iterator_destroy(struct GNUNET_CONTAINER_MultiHashMapIterator *iter)
Destroy a multihashmap iterator.
Closure for both start_task and cancel_task.
int * peer_contested
Contestation state of the peer.
static GNUNET_NETWORK_STRUCT_END struct GNUNET_PeerIdentity me
Our own peer identity.
static int cmp_uint64_t(const void *pa, const void *pb)
struct GNUNET_CONTAINER_MultiHashMap * rfnmap
static int get_peer_idx(const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
Search peer in the list of peers in session.
struct GNUNET_SET_ListenHandle * GNUNET_SET_listen(const struct GNUNET_CONFIGURATION_Handle *cfg, enum GNUNET_SET_OperationType op_type, const struct GNUNET_HashCode *app_id, GNUNET_SET_ListenCallback listen_cb, void *listen_cls)
Wait for set operation requests for the given application ID.
Definition: set_api.c:1017
static struct GNUNET_SERVICE_Handle * service
Handle to our service instance.
unsigned int subordinates_cap
int GNUNET_CONTAINER_multihashmap_iterator_next(struct GNUNET_CONTAINER_MultiHashMapIterator *iter, struct GNUNET_HashCode *key, const void **value)
Retrieve the next element from the hash map at the iterator&#39;s position.
int GNUNET_SET_remove_element(struct GNUNET_SET_Handle *set, const struct GNUNET_SET_Element *element, GNUNET_SET_Continuation cont, void *cont_cls)
Remove an element to the given set.
Definition: set_api.c:734
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
static struct GNUNET_VPN_RedirectionRequest * request
Opaque redirection request handle.
Definition: gnunet-vpn.c:41
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_ntoh(struct GNUNET_TIME_AbsoluteNBO a)
Convert absolute time from network byte order.
Definition: time.c:673
static unsigned int phase
Processing stage that we are in.
Definition: gnunet-arm.c:114
static void client_insert_done(void *cls)
struct GNUNET_STATISTICS_Handle * statistics
Statistics handle.
static int check_client_join(void *cls, const struct GNUNET_CONSENSUS_JoinMessage *m)
Check join message.
static unsigned int element_size
static void try_finish_step_early(struct Step *step)
Weighted diff.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
int GNUNET_SET_add_element(struct GNUNET_SET_Handle *set, const struct GNUNET_SET_Element *element, GNUNET_SET_Continuation cont, void *cont_cls)
Add an element to the given set.
Definition: set_api.c:687
int * peer_commited
Stores, for every peer in the session, whether the peer finished the whole referendum.
If a value with the given key exists, replace it.
Consensus element, either marker or payload.
struct GNUNET_SET_ListenHandle * set_listener
Listener for requests from other peers.
static void shutdown_task(void *cls)
Called to clean up, after a shutdown has been requested.
uint16_t payload_type
Payload element_type, only valid if this is not a marker element.
static void commit_set(struct ConsensusSession *session, struct TaskEntry *task)
Commit the appropriate set for a task.
Handle for a set operation request from another peer.
Definition: set_api.c:115
struct SetHandle * set_handles_head
static void * client_connect_cb(void *cls, struct GNUNET_SERVICE_Client *c, struct GNUNET_MQ_Handle *mq)
Callback called when a client connects to the service.
Handle to a service.
Definition: service.c:116
static void apply_diff_to_rfn(struct DiffEntry *diff, struct ReferendumEntry *rfn, uint16_t voting_peer, uint16_t num_peers)
int16_t repetition
Repetition of the gradecast phase.
static void construct_task_graph(struct ConsensusSession *session)
Element should be added to the result set of the remote peer, i.e.
static void task_start_reconcile(struct TaskEntry *task)
static void put_diff(struct ConsensusSession *session, struct DiffEntry *diff)
struct GNUNET_CONTAINER_MultiHashMapIterator * GNUNET_CONTAINER_multihashmap_iterator_create(const struct GNUNET_CONTAINER_MultiHashMap *map)
Create an iterator for a multihashmap.
struct ReferendumEntry * rfn_create(uint16_t size)
static void task_start_grade(struct TaskEntry *task)
static void rfn_contest(struct ReferendumEntry *rfn, uint16_t contested_peer)
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run on shutdown, that is when a CTRL-C signal is received, or when GNUNET_SCHEDULER_shutdown() is being invoked.
Definition: scheduler.c:1300
static void task_start_finish(struct TaskEntry *task)
Element stored in a set.
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
struct GNUNET_SET_Handle * GNUNET_SET_create(const struct GNUNET_CONFIGURATION_Handle *cfg, enum GNUNET_SET_OperationType op)
Create an empty set, supporting the specified operation.
Definition: set_api.c:657
uint16_t is_contested
Non-zero if this set reconciliation had elements removed because they were contested.
Element should be added to the result set of the local peer, i.e.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
struct GNUNET_SET_Handle * h
static struct ConsensusSession * sessions_tail
Linked list of sessions this peer participates in.
static const char * phasename(uint16_t phase)
struct GNUNET_CONTAINER_MultiHashMap * setmap
struct ConsensusSession * session
struct ConsensusSession * next
Consensus sessions are kept in a DLL.
struct DiffEntry * diff_compose(struct DiffEntry *diff_1, struct DiffEntry *diff_2)
static struct SetEntry * lookup_set(struct ConsensusSession *session, struct SetKey *key)
static struct ReferendumEntry * lookup_rfn(struct ConsensusSession *session, struct RfnKey *key)
static void arrange_peers(uint16_t *p1, uint16_t *p2, uint16_t n)
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
static int ret
Return value of the commandline.
Definition: gnunet-abd.c:81
static void finish_task(struct TaskEntry *task)
static int peer_id_cmp(const void *h1, const void *h2)
Compare two peer identities.
#define GNUNET_NO
Definition: gnunet_common.h:78
unsigned int num_peers
Number of other peers in the consensus.
#define GNUNET_memdup(buf, size)
Allocate and initialize a block of memory.
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:75
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
unsigned int tasks_len
#define GNUNET_new(type)
Allocate a struct or union of the given type.
struct GNUNET_SET_OperationHandle * GNUNET_SET_prepare(const struct GNUNET_PeerIdentity *other_peer, const struct GNUNET_HashCode *app_id, const struct GNUNET_MessageHeader *context_msg, enum GNUNET_SET_ResultMode result_mode, struct GNUNET_SET_Option options[], GNUNET_SET_ResultIterator result_cb, void *result_cls)
Prepare a set operation to be evaluated with another peer.
Definition: set_api.c:813
static struct GNUNET_PeerIdentity my_peer
Peer that runs this service.
static void compute_global_id(struct ConsensusSession *session, const struct GNUNET_HashCode *local_session_id)
Compute a global, (hopefully) unique consensus session id, from the local id of the consensus session...
static struct GNUNET_SCHEDULER_Task * t
Main task.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
static struct TaskEntry * lookup_task(struct ConsensusSession *session, struct TaskKey *key)
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
struct GNUNET_TIME_AbsoluteNBO start
Start time for the consensus.
Definition: consensus.h:58
struct Step * next
All steps of one session are in a linked list for easier deallocation.
void GNUNET_SCHEDULER_shutdown(void)
Request the shutdown of a scheduler.
Definition: scheduler.c:526
Handle for the service.
const struct GNUNET_SET_Element * element
GNUNET_SERVICE_MAIN("consensus", GNUNET_SERVICE_OPTION_NONE, &run, &client_connect_cb, &client_disconnect_cb, NULL, GNUNET_MQ_hd_fixed_size(client_conclude, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, struct GNUNET_MessageHeader, NULL), GNUNET_MQ_hd_var_size(client_insert, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, struct GNUNET_CONSENSUS_ElementMessage, NULL), GNUNET_MQ_hd_var_size(client_join, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, struct GNUNET_CONSENSUS_JoinMessage, NULL), GNUNET_MQ_handler_end())
Define "main" method using service macro.
Last result set from a gradecast.
#define GNUNET_strdup(a)
Wrapper around GNUNET_xstrdup_.
struct GNUNET_CONTAINER_MultiHashMap * taskmap
size_t pending_prereq
Counter for the prerequisites of this step.
static struct ConsensusSession * sessions_head
Linked list of sessions this peer participates in.
void GNUNET_SET_copy_lazy(struct GNUNET_SET_Handle *set, GNUNET_SET_CopyReadyCallback cb, void *cls)
Definition: set_api.c:1194
Success, all elements have been sent (and received).
struct GNUNET_HashCode session_id
Session id of the consensus.
Definition: consensus.h:53
Internal representation of the hash map.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur...
void GNUNET_SET_element_hash(const struct GNUNET_SET_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition: set_api.c:1246
struct SetKey input_set
#define GNUNET_NETWORK_STRUCT_BEGIN
Define as empty, GNUNET_PACKED should suffice, but this won&#39;t work on W32.
struct Step * prev
All steps of one session are in a linked list for easier deallocation.
const void * data
Actual data of the element.
Sent as context message for set reconciliation.
int * peers_blacklisted
Array of peers with length &#39;num_peers&#39;.
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
static void set_copy_cb(void *cls, struct GNUNET_SET_Handle *copy)
Handle to a client that is connected to a service.
Definition: service.c:250
void GNUNET_SET_destroy(struct GNUNET_SET_Handle *set)
Destroy the set handle, and free all associated resources.
Definition: set_api.c:772
static struct DiffEntry * lookup_diff(struct ConsensusSession *session, struct DiffKey *key)
uint32_t num_peers
Number of peers (at the end of this message) that want to participate in the consensus.
Definition: consensus.h:48
struct ConsensusElement ce
static void construct_task_graph_gradecast(struct ConsensusSession *session, uint16_t rep, uint16_t lead, struct Step *step_before, struct Step *step_after)
Construct the task graph for a single gradecast.
static struct GNUNET_ARM_MonitorHandle * m
Monitor connection with ARM.
Definition: gnunet-arm.c:104
static void handle_client_join(void *cls, const struct GNUNET_CONSENSUS_JoinMessage *m)
Called when a client wants to join a consensus session.
static void finish_step(struct Step *step)
struct GNUNET_PeerIdentity * peers
int GNUNET_asprintf(char **buf, const char *format,...)
Like asprintf, just portable.
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct...
Definition: gnunet_mq_lib.h:52
struct GNUNET_TIME_Absolute conclude_start
Time when the conclusion of the consensus should begin.
static void create_set_copy_for_task(struct TaskEntry *task, struct SetKey *src_set_key, struct SetKey *dst_set_key)
Call the start function of the given task again after we created a copy of the given set...
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
void(* TaskFunc)(struct TaskEntry *task)
int16_t peer2
Number of the second peer in canonical order.
#define GNUNET_array_grow(arr, size, tsize)
Grow a well-typed (!) array.
static struct Step * create_step(struct ConsensusSession *session, int round, int early_finishable)
, &#39; bother checking if a value already exists (faster than GNUNET_CONTAINER_MULTIHASHMAPOPTION_...
void GNUNET_CRYPTO_hash_create_random(enum GNUNET_CRYPTO_Quality mode, struct GNUNET_HashCode *result)
Create a random hash code.
Definition: crypto_hash.c:144
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN.
Definition: consensus.h:42
struct Step ** subordinates
The other peer refused to to the operation with us, or something went wrong.
static int check_client_insert(void *cls, const struct GNUNET_CONSENSUS_ElementMessage *msg)
Called when a client performs an insert operation.
struct GNUNET_CONTAINER_MultiHashMap * changes
static void diff_insert(struct DiffEntry *diff, int weight, const struct GNUNET_SET_Element *element)
static void install_step_timeouts(struct ConsensusSession *session)
struct SetHandle * set_handles_tail
uint64_t lower_bound
Bounded Eppstein lower bound.
static void set_listen_cb(void *cls, const struct GNUNET_PeerIdentity *other_peer, const struct GNUNET_MessageHeader *context_msg, struct GNUNET_SET_Request *request)
Called when another peer wants to do a set operation with the local peer.
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
void GNUNET_CRYPTO_hash(const void *block, size_t size, struct GNUNET_HashCode *ret)
Compute hash of a given block.
Definition: crypto_hash.c:48
static void handle_client_conclude(void *cls, const struct GNUNET_MessageHeader *message)
Called when a client performs the conclude operation.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
struct TaskEntry * task
Task to finish once all changes are through.
GNUNET_SET_Status
Status for the result callback.
Message with an element.
Definition: consensus.h:72
static struct SolverHandle * sh
#define GNUNET_MIN(a, b)
Definition: gnunet_common.h:80
struct DiffKey key
Vote that nothing should change.
const char * GNUNET_STRINGS_relative_time_to_string(struct GNUNET_TIME_Relative delta, int do_round)
Give relative time in human-readable fancy format.
Definition: strings.c:687
struct SetKey dst_set_key
int weight
Positive weight for &#39;add&#39;, negative weights for &#39;remove&#39;.
uint16_t status
See PRISM_STATUS_*-constants.
static char buf[2048]
#define GNUNET_new_array(n, type)
Allocate a size n array with structs or unions of the given type.
static uint16_t task_other_peer(struct TaskEntry *task)
static void cleanup(void *cls)
Function scheduled as very last function, cleans up after us.
static const struct GNUNET_CONFIGURATION_Handle * cfg
Configuration of the consensus service.
p2p message definitions for consensus
struct SetKey input_set
struct GNUNET_MessageHeader header
Type: Either GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT or GNUNET_MESSAGE_TYPE_CONSENSUS_C...
Definition: consensus.h:79
static void client_disconnect_cb(void *cls, struct GNUNET_SERVICE_Client *c, void *internal_cls)
Callback called when a client disconnected from the service.
void GNUNET_STATISTICS_set(struct GNUNET_STATISTICS_Handle *handle, const char *name, uint64_t value, int make_persistent)
Set statistic value for the peer.
Apply a repetition of the all-to-all gradecast to the current set.
A 512-bit hashcode.
Client gets notified of the required changes for both the local and the remote set.
void GNUNET_SERVICE_client_drop(struct GNUNET_SERVICE_Client *c)
Ask the server to disconnect from the given client.
Definition: service.c:2324
struct ConsensusSession * prev
Consensus sessions are kept in a DLL.
char * debug_name
Human-readable name for the task, used for debugging.
Opaque handle to a set.
Definition: set_api.c:49
static const char * setname(uint16_t kind)
int GNUNET_CONFIGURATION_get_value_string(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, char **value)
Get a configuration value that should be a string.
static void task_start_eval_echo(struct TaskEntry *task)
struct TaskKey key
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:35
There must only be one value per key; storing a value should fail if a value under the same key alrea...
static uint16_t rfn_noncontested(struct ReferendumEntry *rfn)
struct TaskEntry ** tasks
Tasks that this step is composed of.
struct GNUNET_TESTBED_Peer * peer
The peer associated with this model.
struct GNUNET_HashCode key
The key used in the DHT.
int early_finishable
When we&#39;re doing an early finish, how should this step be treated? If GNUNET_YES, the step will be ma...
#define GNUNET_SYSERR
Definition: gnunet_common.h:76
static unsigned int size
Size of the "table".
Definition: peer.c:67
#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT
Sent by service when a new element is added.
static void step_depend_on(struct Step *step, struct Step *dep)
Record dep as a dependency of step.
struct GNUNET_CONTAINER_MultiHashMap * diffmap
int is_contested
GNUNET_YES if the set resulted from applying a referendum with contested elements.
static void initialize_session_peer_list(struct ConsensusSession *session, const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
Create the sorted list of peers for the session, add the local peer if not in the join message...
struct GNUNET_HashCode global_id
Global consensus identification, computed from the session id and participating authorities.
#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE
Sent by client to service in order to start the consensus conclusion.
void GNUNET_SET_listen_cancel(struct GNUNET_SET_ListenHandle *lh)
Cancel the given listen operation.
Definition: set_api.c:1051
#define GNUNET_CONTAINER_DLL_insert_tail(head, tail, element)
Insert an element at the tail of a DLL.
struct GNUNET_MQ_Handle * client_mq
Queued messages to the client.
Option for set operations.
int GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
Sent by the client to the service, when the client wants the service to join a consensus session...
Definition: consensus.h:37
struct DiffEntry * diff_create()
static unsigned int num_peers
#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT
Insert an element.
static void put_rfn(struct ConsensusSession *session, struct ReferendumEntry *rfn)
#define GNUNET_memcmp(a, b)
Compare memory in a and b, where both must be of the same pointer type.
unsigned int is_finished
struct DiffKey output_diff
int16_t leader
Leader in the gradecast phase.
Handle to an operation.
Definition: set_api.c:135
Handle to a message queue.
Definition: mq.c:85
A consensus session consists of one local client and the remote authorities.
Tuple of integers that together identify a task uniquely.
static void run_ready_steps(struct ConsensusSession *session)
#define GNUNET_NETWORK_STRUCT_END
Define as empty, GNUNET_PACKED should suffice, but this won&#39;t work on W32;.
unsigned int round
struct GNUNET_HashCode rand
#define GNUNET_MQ_msg_header(type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
Definition: gnunet_mq_lib.h:76
static struct GNUNET_FS_PublishContext * pc
Handle to FS-publishing operation.
The identity of the host (wraps the signing key of the peer).
int GNUNET_SET_commit(struct GNUNET_SET_OperationHandle *oh, struct GNUNET_SET_Handle *set)
Commit a set to be used with a set operation.
Definition: set_api.c:1128
static const char * diffname(uint16_t kind)
uint16_t kind
A value from &#39;enum PhaseKind&#39;.
uint8_t marker
Is this a marker element?
Vote that an element should be added.
static void task_cancel_reconcile(struct TaskEntry *task)
static void set_result_cb(void *cls, const struct GNUNET_SET_Element *element, uint64_t current_size, enum GNUNET_SET_Status status)
Callback for set operation results.
#define GNUNET_PACKED
gcc-ism to get packed structs.
unsigned int finished_tasks
configuration data
Definition: configuration.c:85
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT.
struct GNUNET_SET_Handle * h
int GNUNET_CRYPTO_hash_cmp(const struct GNUNET_HashCode *h1, const struct GNUNET_HashCode *h2)
Compare function for HashCodes, producing a total ordering of all hashcodes.
Definition: crypto_hash.c:294
struct SetHandle * next
uint16_t element_type
Type: GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_NEW_ELEMENT.
Definition: consensus.h:84
Block type for consensus elements.
#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE
Sent by service to client in order to signal a completed consensus conclusion.
int16_t leader
Leader in the gradecast phase.
uint16_t size
Number of bytes in the buffer pointed to by data.
struct Step * step
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
const struct GNUNET_SET_Element * element
#define GNUNET_log(kind,...)
Entry in list of pending tasks.
Definition: scheduler.c:134
unsigned int local_peer_idx
Index of the local peer in the peers array.
unsigned int is_running
#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN
Join a consensus session.
Opaque handle to a listen operation.
Definition: set_api.c:186
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
enum ReferendumVote proposal
Proposal for this element, can only be VOTE_ADD or VOTE_REMOVE.
int GNUNET_CRYPTO_get_peer_identity(const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_PeerIdentity *dst)
Retrieve the identity of the host&#39;s peer.
struct SetOpCls setop
unsigned int num_client_insert_pending
static const char * rfnname(uint16_t kind)
static void run(void *cls, const struct GNUNET_CONFIGURATION_Handle *c, struct GNUNET_SERVICE_Handle *service)
Start processing consensus requests.
struct GNUNET_TIME_Absolute conclude_deadline
Timeout for all rounds together, single rounds will schedule a timeout task with a fraction of the co...
void GNUNET_SET_operation_cancel(struct GNUNET_SET_OperationHandle *oh)
Cancel the given set operation.
Definition: set_api.c:516
static void handle_client_insert(void *cls, const struct GNUNET_CONSENSUS_ElementMessage *msg)
Called when a client performs an insert operation.
static int send_to_client_iter(void *cls, const struct GNUNET_SET_Element *element)
Send the final result set of the consensus to the client, element by element.
struct GNUNET_SET_Element * GNUNET_SET_element_dup(const struct GNUNET_SET_Element *element)
Create a copy of an element.
Definition: set_api.c:1223
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
struct GNUNET_SET_OperationHandle * op
Header for all communications.
struct GNUNET_SERVICE_Client * client
Client that inhabits the session.
Time for absolute times used by GNUnet, in microseconds.
#define GNUNET_YES
Definition: gnunet_common.h:77
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_difference(struct GNUNET_TIME_Absolute start, struct GNUNET_TIME_Absolute end)
Compute the time difference between the given start and end times.
Definition: time.c:354
static void rfn_commit(struct ReferendumEntry *rfn, uint16_t commit_peer)
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:353
struct RfnKey output_rfn
static void set_mutation_done(void *cls)
union TaskFuncCls cls
struct SetHandle * prev
static void rfn_majority(const struct ReferendumEntry *rfn, const struct RfnElementInfo *ri, uint16_t *ret_majority, enum ReferendumVote *ret_vote)
For a given majority, count what the outcome is (add/remove/keep), and give the number of peers that ...
unsigned int tasks_cap
int16_t peer1
Number of the first peer in canonical order.
struct ConsensusElement ce
static void start_task(struct ConsensusSession *session, struct TaskEntry *task)
Set union, return all elements that are in at least one of the sets.
static struct GNUNET_IDENTITY_EgoLookup * el
EgoLookup.
Definition: gnunet-abd.c:51
int GNUNET_SET_iterate(struct GNUNET_SET_Handle *set, GNUNET_SET_ElementIterator iter, void *iter_cls)
Iterate over all elements in the given set.
Definition: set_api.c:1172
unsigned int subordinates_len
struct SetKey output_set
static void task_start_apply_round(struct TaskEntry *task)
Apply the result from one round of gradecasts (i.e.
static void put_set(struct ConsensusSession *session, struct SetEntry *set)
struct GNUNET_CONTAINER_MultiHashMap * rfn_elements
int GNUNET_CRYPTO_kdf(void *result, size_t out_len, const void *xts, size_t xts_len, const void *skm, size_t skm_len,...)
Derive key.
Definition: crypto_kdf.c:89
void GNUNET_SERVICE_client_continue(struct GNUNET_SERVICE_Client *c)
Continue receiving further messages from the given client.
Definition: service.c:2243
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
struct GNUNET_TIME_AbsoluteNBO deadline
Deadline for conclude.
Definition: consensus.h:63
int early_stopping
State of our early stopping scheme.
No good quality of the operation is needed (i.e., random numbers can be pseudo-random).
static void rfn_vote(struct ReferendumEntry *rfn, uint16_t voting_peer, enum ReferendumVote vote, const struct GNUNET_SET_Element *element)
static void put_task(struct GNUNET_CONTAINER_MultiHashMap *taskmap, struct TaskEntry *t)
#define GNUNET_malloc(size)
Wrapper around malloc.
int16_t peer1
Number of the first peer in canonical order.
struct SetKey key
int16_t peer2
Number of the second peer in canonical order.
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:48
struct TaskEntry * task
#define GNUNET_free(ptr)
Wrapper around free.
uint16_t element_type
Application-specific element type.
int16_t repetition
Repetition of the gradecast phase.
struct GNUNET_SET_OperationHandle * GNUNET_SET_accept(struct GNUNET_SET_Request *request, enum GNUNET_SET_ResultMode result_mode, struct GNUNET_SET_Option options[], GNUNET_SET_ResultIterator result_cb, void *result_cls)
Accept a request we got via GNUNET_SET_listen().
Definition: set_api.c:1085
struct GNUNET_SCHEDULER_Task * timeout_task
Fail set operations when the other peer shows weird behavior that might by a Byzantine fault...
uint16_t kind
A value from &#39;enum PhaseKind&#39;.