GNUnet  0.10.x
gnunet-service-consensus.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2012, 2013, 2017 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19 */
20 
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_set_service.h"
35 #include "consensus_protocol.h"
36 #include "consensus.h"
37 
38 
40 {
45  VOTE_STAY = 0,
49  VOTE_ADD = 1,
54 };
55 
56 
58 {
62 };
63 
64 
66 
71 struct TaskKey {
75  uint16_t kind GNUNET_PACKED;
76 
82 
87 
92 
99 };
100 
101 
102 
103 struct SetKey
104 {
105  int set_kind GNUNET_PACKED;
108 };
109 
110 
111 struct SetEntry
112 {
113  struct SetKey key;
121 };
122 
123 
124 struct DiffKey
125 {
126  int diff_kind GNUNET_PACKED;
129 };
130 
131 struct RfnKey
132 {
133  int rfn_kind GNUNET_PACKED;
136 };
137 
138 
140 
142 {
156 };
157 
158 
160 {
169 };
170 
172 {
177 };
178 
180 {
185 };
186 
187 
188 struct SetOpCls
189 {
190  struct SetKey input_set;
191 
192  struct SetKey output_set;
193  struct RfnKey output_rfn;
194  struct DiffKey output_diff;
195 
197 
199 
201 };
202 
203 
204 struct FinishCls
205 {
206  struct SetKey input_set;
207 };
208 
214 {
215  struct SetOpCls setop;
216  struct FinishCls finish;
217 };
218 
219 struct TaskEntry;
220 
221 typedef void (*TaskFunc) (struct TaskEntry *task);
222 
223 /*
224  * Node in the consensus task graph.
225  */
226 struct TaskEntry
227 {
228  struct TaskKey key;
229 
230  struct Step *step;
231 
233 
235 
238 
239  union TaskFuncCls cls;
240 };
241 
242 
243 struct Step
244 {
249  struct Step *prev;
250 
255  struct Step *next;
256 
258 
262  struct TaskEntry **tasks;
263  unsigned int tasks_len;
264  unsigned int tasks_cap;
265 
266  unsigned int finished_tasks;
267 
268  /*
269  * Tasks that have this task as dependency.
270  *
271  * We store pointers to subordinates rather
272  * than to prerequisites since it makes
273  * tracking the readiness of a task easier.
274  */
275  struct Step **subordinates;
276  unsigned int subordinates_len;
277  unsigned int subordinates_cap;
278 
284 
285  /*
286  * Task that will run this step despite
287  * any pending prerequisites.
288  */
290 
291  unsigned int is_running;
292 
293  unsigned int is_finished;
294 
295  /*
296  * Synchrony round of the task.
297  * Determines the deadline for the task.
298  */
299  unsigned int round;
300 
305  char *debug_name;
306 
319 };
320 
321 
323 {
325 
326  /*
327  * GNUNET_YES if the peer votes for the proposal.
328  */
329  int *votes;
330 
335  enum ReferendumVote proposal;
336 };
337 
338 
340 {
341  struct RfnKey key;
342 
343  /*
344  * Elements where there is at least one proposed change.
345  *
346  * Maps the hash of the GNUNET_SET_Element
347  * to 'struct RfnElementInfo'.
348  */
350 
351  unsigned int num_peers;
352 
364 
365 
372 };
373 
374 
376 {
378 
383  int weight;
384 };
385 
386 
390 struct DiffEntry
391 {
392  struct DiffKey key;
394 };
395 
396 struct SetHandle
397 {
398  struct SetHandle *prev;
399  struct SetHandle *next;
400 
402 };
403 
404 
405 
410 {
415 
420 
422 
426 
431 
432  /*
433  * Mapping from (hashed) TaskKey to TaskEntry.
434  *
435  * We map the application_id for a round to the task that should be
436  * executed, so we don't have to go through all task whenever we get
437  * an incoming set op request.
438  */
440 
441  struct Step *steps_head;
442  struct Step *steps_tail;
443 
445 
447 
452  struct GNUNET_HashCode global_id;
453 
458 
463 
467  struct GNUNET_TIME_Absolute conclude_start;
468 
474  struct GNUNET_TIME_Absolute conclude_deadline;
475 
477 
481  unsigned int num_peers;
482 
486  unsigned int local_peer_idx;
487 
493 
498 
502  uint64_t first_size;
503 
505 
509  uint64_t lower_bound;
510 
513 };
514 
519 
524 
528 static const struct GNUNET_CONFIGURATION_Handle *cfg;
529 
534 
539 
540 
541 static void
542 finish_task (struct TaskEntry *task);
543 
544 
545 static void
546 run_ready_steps (struct ConsensusSession *session);
547 
548 
549 static const char *
550 phasename (uint16_t phase)
551 {
552  switch (phase)
553  {
554  case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
555  case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
556  case PHASE_KIND_FINISH: return "FINISH";
557  case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
558  case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
559  case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
560  case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
561  case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
562  case PHASE_KIND_APPLY_REP: return "APPLY_REP";
563  default: return "(unknown)";
564  }
565 }
566 
567 
568 static const char *
569 setname (uint16_t kind)
570 {
571  switch (kind)
572  {
573  case SET_KIND_CURRENT: return "CURRENT";
574  case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
575  case SET_KIND_NONE: return "NONE";
576  default: return "(unknown)";
577  }
578 }
579 
580 static const char *
581 rfnname (uint16_t kind)
582 {
583  switch (kind)
584  {
585  case RFN_KIND_NONE: return "NONE";
586  case RFN_KIND_ECHO: return "ECHO";
587  case RFN_KIND_CONFIRM: return "CONFIRM";
588  default: return "(unknown)";
589  }
590 }
591 
592 static const char *
593 diffname (uint16_t kind)
594 {
595  switch (kind)
596  {
597  case DIFF_KIND_NONE: return "NONE";
598  case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
599  case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
600  case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
601  default: return "(unknown)";
602  }
603 }
604 
605 #ifdef GNUNET_EXTRA_LOGGING
606 
607 
608 static const char *
609 debug_str_element (const struct GNUNET_SET_Element *el)
610 {
611  struct GNUNET_HashCode hash;
612 
613  GNUNET_SET_element_hash (el, &hash);
614 
615  return GNUNET_h2s (&hash);
616 }
617 
618 static const char *
619 debug_str_task_key (struct TaskKey *tk)
620 {
621  static char buf[256];
622 
623  snprintf (buf, sizeof (buf),
624  "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
625  phasename (tk->kind), tk->peer1, tk->peer2,
626  tk->leader, tk->repetition);
627 
628  return buf;
629 }
630 
631 static const char *
632 debug_str_diff_key (struct DiffKey *dk)
633 {
634  static char buf[256];
635 
636  snprintf (buf, sizeof (buf),
637  "DiffKey kind=%s, k1=%d, k2=%d",
638  diffname (dk->diff_kind), dk->k1, dk->k2);
639 
640  return buf;
641 }
642 
643 static const char *
644 debug_str_set_key (const struct SetKey *sk)
645 {
646  static char buf[256];
647 
648  snprintf (buf, sizeof (buf),
649  "SetKey kind=%s, k1=%d, k2=%d",
650  setname (sk->set_kind), sk->k1, sk->k2);
651 
652  return buf;
653 }
654 
655 
656 static const char *
657 debug_str_rfn_key (const struct RfnKey *rk)
658 {
659  static char buf[256];
660 
661  snprintf (buf, sizeof (buf),
662  "RfnKey kind=%s, k1=%d, k2=%d",
663  rfnname (rk->rfn_kind), rk->k1, rk->k2);
664 
665  return buf;
666 }
667 
668 #endif /* GNUNET_EXTRA_LOGGING */
669 
670 
680 static int
682  const struct GNUNET_SET_Element *element)
683 {
684  struct TaskEntry *task = (struct TaskEntry *) cls;
685  struct ConsensusSession *session = task->step->session;
686  struct GNUNET_MQ_Envelope *ev;
687 
688  if (NULL != element)
689  {
691  const struct ConsensusElement *ce;
692 
694  ce = element->data;
695 
696  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "marker is %u\n", (unsigned) ce->marker);
697 
698  if (0 != ce->marker)
699  return GNUNET_YES;
700 
702  "P%d: sending element %s to client\n",
703  session->local_peer_idx,
704  debug_str_element (element));
705 
706  ev = GNUNET_MQ_msg_extra (m, element->size - sizeof (struct ConsensusElement),
708  m->element_type = ce->payload_type;
709  GNUNET_memcpy (&m[1], &ce[1], element->size - sizeof (struct ConsensusElement));
710  GNUNET_MQ_send (session->client_mq, ev);
711  }
712  else
713  {
715  "P%d: finished iterating elements for client\n",
716  session->local_peer_idx);
718  GNUNET_MQ_send (session->client_mq, ev);
719  }
720  return GNUNET_YES;
721 }
722 
723 
724 static struct SetEntry *
725 lookup_set (struct ConsensusSession *session, struct SetKey *key)
726 {
727  struct GNUNET_HashCode hash;
728 
730  "P%u: looking up set {%s}\n",
731  session->local_peer_idx,
732  debug_str_set_key (key));
733 
735  GNUNET_CRYPTO_hash (key, sizeof (struct SetKey), &hash);
736  return GNUNET_CONTAINER_multihashmap_get (session->setmap, &hash);
737 }
738 
739 
740 static struct DiffEntry *
741 lookup_diff (struct ConsensusSession *session, struct DiffKey *key)
742 {
743  struct GNUNET_HashCode hash;
744 
746  "P%u: looking up diff {%s}\n",
747  session->local_peer_idx,
748  debug_str_diff_key (key));
749 
751  GNUNET_CRYPTO_hash (key, sizeof (struct DiffKey), &hash);
752  return GNUNET_CONTAINER_multihashmap_get (session->diffmap, &hash);
753 }
754 
755 
756 static struct ReferendumEntry *
757 lookup_rfn (struct ConsensusSession *session, struct RfnKey *key)
758 {
759  struct GNUNET_HashCode hash;
760 
762  "P%u: looking up rfn {%s}\n",
763  session->local_peer_idx,
764  debug_str_rfn_key (key));
765 
767  GNUNET_CRYPTO_hash (key, sizeof (struct RfnKey), &hash);
768  return GNUNET_CONTAINER_multihashmap_get (session->rfnmap, &hash);
769 }
770 
771 
772 static void
773 diff_insert (struct DiffEntry *diff,
774  int weight,
775  const struct GNUNET_SET_Element *element)
776 {
777  struct DiffElementInfo *di;
778  struct GNUNET_HashCode hash;
779 
780  GNUNET_assert ( (1 == weight) || (-1 == weight));
781 
783  "diff_insert with element size %u\n",
784  element->size);
785 
787  "hashing element\n");
788 
789  GNUNET_SET_element_hash (element, &hash);
790 
792  "hashed element\n");
793 
794  di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
795 
796  if (NULL == di)
797  {
798  di = GNUNET_new (struct DiffElementInfo);
799  di->element = GNUNET_SET_element_dup (element);
802  &hash, di,
804  }
805 
806  di->weight = weight;
807 }
808 
809 
810 static void
812  uint16_t commit_peer)
813 {
814  GNUNET_assert (commit_peer < rfn->num_peers);
815 
816  rfn->peer_commited[commit_peer] = GNUNET_YES;
817 }
818 
819 
820 static void
822  uint16_t contested_peer)
823 {
824  GNUNET_assert (contested_peer < rfn->num_peers);
825 
826  rfn->peer_contested[contested_peer] = GNUNET_YES;
827 }
828 
829 
830 static uint16_t
832 {
833  uint16_t i;
834  uint16_t ret;
835 
836  ret = 0;
837  for (i = 0; i < rfn->num_peers; i++)
838  if ( (GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]) )
839  ret++;
840 
841  return ret;
842 }
843 
844 
845 static void
847  uint16_t voting_peer,
848  enum ReferendumVote vote,
849  const struct GNUNET_SET_Element *element)
850 {
851  struct RfnElementInfo *ri;
852  struct GNUNET_HashCode hash;
853 
854  GNUNET_assert (voting_peer < rfn->num_peers);
855 
856  /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
857  since VOTE_KEEP is implicit in not voting. */
858  GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
859 
860  GNUNET_SET_element_hash (element, &hash);
862 
863  if (NULL == ri)
864  {
865  ri = GNUNET_new (struct RfnElementInfo);
866  ri->element = GNUNET_SET_element_dup (element);
867  ri->votes = GNUNET_new_array (rfn->num_peers, int);
870  &hash, ri,
872  }
873 
874  ri->votes[voting_peer] = GNUNET_YES;
875  ri->proposal = vote;
876 }
877 
878 
879 static uint16_t
881 {
882  uint16_t me = task->step->session->local_peer_idx;
883  if (task->key.peer1 == me)
884  return task->key.peer2;
885  return task->key.peer1;
886 }
887 
888 
889 static int
890 cmp_uint64_t (const void *pa, const void *pb)
891 {
892  uint64_t a = *(uint64_t *) pa;
893  uint64_t b = *(uint64_t *) pb;
894 
895  if (a == b)
896  return 0;
897  if (a < b)
898  return -1;
899  return 1;
900 }
901 
902 
912 static void
913 set_result_cb (void *cls,
914  const struct GNUNET_SET_Element *element,
915  uint64_t current_size,
917 {
918  struct TaskEntry *task = cls;
919  struct ConsensusSession *session = task->step->session;
920  struct SetEntry *output_set = NULL;
921  struct DiffEntry *output_diff = NULL;
922  struct ReferendumEntry *output_rfn = NULL;
923  unsigned int other_idx;
924  struct SetOpCls *setop;
925  const struct ConsensusElement *consensus_element = NULL;
926 
927  if (NULL != element)
928  {
930  "P%u: got element of type %u, status %u\n",
931  session->local_peer_idx,
932  (unsigned) element->element_type,
933  (unsigned) status);
935  consensus_element = element->data;
936  }
937 
938  setop = &task->cls.setop;
939 
940 
942  "P%u: got set result for {%s}, status %u\n",
943  session->local_peer_idx,
944  debug_str_task_key (&task->key),
945  status);
946 
947  if (GNUNET_NO == task->is_started)
948  {
949  GNUNET_break_op (0);
950  return;
951  }
952 
953  if (GNUNET_YES == task->is_finished)
954  {
955  GNUNET_break_op (0);
956  return;
957  }
958 
959  other_idx = task_other_peer (task);
960 
961  if (SET_KIND_NONE != setop->output_set.set_kind)
962  {
963  output_set = lookup_set (session, &setop->output_set);
964  GNUNET_assert (NULL != output_set);
965  }
966 
967  if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
968  {
969  output_diff = lookup_diff (session, &setop->output_diff);
970  GNUNET_assert (NULL != output_diff);
971  }
972 
973  if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
974  {
975  output_rfn = lookup_rfn (session, &setop->output_rfn);
976  GNUNET_assert (NULL != output_rfn);
977  }
978 
979  if (GNUNET_YES == session->peers_blacklisted[other_idx])
980  {
981  /* Peer might have been blacklisted
982  by a gradecast running in parallel, ignore elements from now */
983  if (GNUNET_SET_STATUS_ADD_LOCAL == status)
984  return;
985  if (GNUNET_SET_STATUS_ADD_REMOTE == status)
986  return;
987  }
988 
989  if ( (NULL != consensus_element) && (0 != consensus_element->marker) )
990  {
992  "P%u: got some marker\n",
993  session->local_peer_idx);
994  if ( (GNUNET_YES == setop->transceive_contested) &&
995  (CONSENSUS_MARKER_CONTESTED == consensus_element->marker) )
996  {
997  GNUNET_assert (NULL != output_rfn);
998  rfn_contest (output_rfn, task_other_peer (task));
999  return;
1000  }
1001 
1002  if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
1003  {
1004 
1006  "P%u: got size marker\n",
1007  session->local_peer_idx);
1008 
1009 
1010  struct ConsensusSizeElement *cse = (void *) consensus_element;
1011 
1012  if (cse->sender_index == other_idx)
1013  {
1014  if (NULL == session->first_sizes_received)
1015  session->first_sizes_received = GNUNET_new_array (session->num_peers, uint64_t);
1016  session->first_sizes_received[other_idx] = GNUNET_ntohll (cse->size);
1017 
1018  uint64_t *copy = GNUNET_memdup (session->first_sizes_received, sizeof (uint64_t) * session->num_peers);
1019  qsort (copy, session->num_peers, sizeof (uint64_t), cmp_uint64_t);
1020  session->lower_bound = copy[session->num_peers / 3 + 1];
1022  "P%u: lower bound %llu\n",
1023  session->local_peer_idx,
1024  (long long) session->lower_bound);
1025  GNUNET_free (copy);
1026  }
1027  return;
1028  }
1029 
1030  return;
1031  }
1032 
1033  switch (status)
1034  {
1036  GNUNET_assert (NULL != consensus_element);
1038  "Adding element in Task {%s}\n",
1039  debug_str_task_key (&task->key));
1040  if (NULL != output_set)
1041  {
1042  // FIXME: record pending adds, use callback
1043  GNUNET_SET_add_element (output_set->h,
1044  element,
1045  NULL,
1046  NULL);
1047 #ifdef GNUNET_EXTRA_LOGGING
1049  "P%u: adding element %s into set {%s} of task {%s}\n",
1050  session->local_peer_idx,
1051  debug_str_element (element),
1052  debug_str_set_key (&setop->output_set),
1053  debug_str_task_key (&task->key));
1054 #endif
1055  }
1056  if (NULL != output_diff)
1057  {
1058  diff_insert (output_diff, 1, element);
1059 #ifdef GNUNET_EXTRA_LOGGING
1061  "P%u: adding element %s into diff {%s} of task {%s}\n",
1062  session->local_peer_idx,
1063  debug_str_element (element),
1064  debug_str_diff_key (&setop->output_diff),
1065  debug_str_task_key (&task->key));
1066 #endif
1067  }
1068  if (NULL != output_rfn)
1069  {
1070  rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
1071 #ifdef GNUNET_EXTRA_LOGGING
1073  "P%u: adding element %s into rfn {%s} of task {%s}\n",
1074  session->local_peer_idx,
1075  debug_str_element (element),
1076  debug_str_rfn_key (&setop->output_rfn),
1077  debug_str_task_key (&task->key));
1078 #endif
1079  }
1080  // XXX: add result to structures in task
1081  break;
1083  GNUNET_assert (NULL != consensus_element);
1084  if (GNUNET_YES == setop->do_not_remove)
1085  break;
1086  if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1087  break;
1089  "Removing element in Task {%s}\n",
1090  debug_str_task_key (&task->key));
1091  if (NULL != output_set)
1092  {
1093  // FIXME: record pending adds, use callback
1094  GNUNET_SET_remove_element (output_set->h,
1095  element,
1096  NULL,
1097  NULL);
1098 #ifdef GNUNET_EXTRA_LOGGING
1100  "P%u: removing element %s from set {%s} of task {%s}\n",
1101  session->local_peer_idx,
1102  debug_str_element (element),
1103  debug_str_set_key (&setop->output_set),
1104  debug_str_task_key (&task->key));
1105 #endif
1106  }
1107  if (NULL != output_diff)
1108  {
1109  diff_insert (output_diff, -1, element);
1110 #ifdef GNUNET_EXTRA_LOGGING
1112  "P%u: removing element %s from diff {%s} of task {%s}\n",
1113  session->local_peer_idx,
1114  debug_str_element (element),
1115  debug_str_diff_key (&setop->output_diff),
1116  debug_str_task_key (&task->key));
1117 #endif
1118  }
1119  if (NULL != output_rfn)
1120  {
1121  rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
1122 #ifdef GNUNET_EXTRA_LOGGING
1124  "P%u: removing element %s from rfn {%s} of task {%s}\n",
1125  session->local_peer_idx,
1126  debug_str_element (element),
1127  debug_str_rfn_key (&setop->output_rfn),
1128  debug_str_task_key (&task->key));
1129 #endif
1130  }
1131  break;
1133  // XXX: check first if any changes to the underlying
1134  // set are still pending
1136  "P%u: Finishing setop in Task {%s} (%u/%u)\n",
1137  session->local_peer_idx,
1138  debug_str_task_key (&task->key),
1139  (unsigned int) task->step->finished_tasks,
1140  (unsigned int) task->step->tasks_len);
1141  if (NULL != output_rfn)
1142  {
1143  rfn_commit (output_rfn, task_other_peer (task));
1144  }
1145  if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1146  {
1147  session->first_size = current_size;
1148  }
1149  finish_task (task);
1150  break;
1152  // XXX: cleanup
1153  GNUNET_break_op (0);
1154  finish_task (task);
1155  return;
1156  default:
1157  /* not reached */
1158  GNUNET_assert (0);
1159  }
1160 }
1161 
1162 #ifdef EVIL
1163 
1164 enum EvilnessType
1165 {
1166  EVILNESS_NONE,
1167  EVILNESS_CRAM_ALL,
1168  EVILNESS_CRAM_LEAD,
1169  EVILNESS_CRAM_ECHO,
1170  EVILNESS_SLACK,
1171  EVILNESS_SLACK_A2A,
1172 };
1173 
1174 enum EvilnessSubType
1175 {
1176  EVILNESS_SUB_NONE,
1177  EVILNESS_SUB_REPLACEMENT,
1178  EVILNESS_SUB_NO_REPLACEMENT,
1179 };
1180 
1181 struct Evilness
1182 {
1183  enum EvilnessType type;
1184  enum EvilnessSubType subtype;
1185  unsigned int num;
1186 };
1187 
1188 
1189 static int
1190 parse_evilness_cram_subtype (const char *evil_subtype_str, struct Evilness *evil)
1191 {
1192  if (0 == strcmp ("replace", evil_subtype_str))
1193  {
1194  evil->subtype = EVILNESS_SUB_REPLACEMENT;
1195  }
1196  else if (0 == strcmp ("noreplace", evil_subtype_str))
1197  {
1198  evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1199  }
1200  else
1201  {
1203  "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1204  evil_subtype_str);
1205  return GNUNET_SYSERR;
1206  }
1207  return GNUNET_OK;
1208 }
1209 
1210 
1211 static void
1212 get_evilness (struct ConsensusSession *session, struct Evilness *evil)
1213 {
1214  char *evil_spec;
1215  char *field;
1216  char *evil_type_str = NULL;
1217  char *evil_subtype_str = NULL;
1218 
1219  GNUNET_assert (NULL != evil);
1220 
1221  if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", "EVIL_SPEC", &evil_spec))
1222  {
1224  "P%u: no evilness\n",
1225  session->local_peer_idx);
1226  evil->type = EVILNESS_NONE;
1227  return;
1228  }
1230  "P%u: got evilness spec\n",
1231  session->local_peer_idx);
1232 
1233  for (field = strtok (evil_spec, "/");
1234  NULL != field;
1235  field = strtok (NULL, "/"))
1236  {
1237  unsigned int peer_num;
1238  unsigned int evil_num;
1239  int ret;
1240 
1241  evil_type_str = NULL;
1242  evil_subtype_str = NULL;
1243 
1244  ret = sscanf (field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1245 
1246  if (ret != 4)
1247  {
1249  "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1250  field,
1251  ret);
1252  goto not_evil;
1253  }
1254 
1255  GNUNET_assert (NULL != evil_type_str);
1256  GNUNET_assert (NULL != evil_subtype_str);
1257 
1258  if (peer_num == session->local_peer_idx)
1259  {
1260  if (0 == strcmp ("slack", evil_type_str))
1261  {
1262  evil->type = EVILNESS_SLACK;
1263  }
1264  if (0 == strcmp ("slack-a2a", evil_type_str))
1265  {
1266  evil->type = EVILNESS_SLACK_A2A;
1267  }
1268  else if (0 == strcmp ("cram-all", evil_type_str))
1269  {
1270  evil->type = EVILNESS_CRAM_ALL;
1271  evil->num = evil_num;
1272  if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1273  goto not_evil;
1274  }
1275  else if (0 == strcmp ("cram-lead", evil_type_str))
1276  {
1277  evil->type = EVILNESS_CRAM_LEAD;
1278  evil->num = evil_num;
1279  if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1280  goto not_evil;
1281  }
1282  else if (0 == strcmp ("cram-echo", evil_type_str))
1283  {
1284  evil->type = EVILNESS_CRAM_ECHO;
1285  evil->num = evil_num;
1286  if (GNUNET_OK != parse_evilness_cram_subtype (evil_subtype_str, evil))
1287  goto not_evil;
1288  }
1289  else
1290  {
1292  "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1293  evil_type_str);
1294  goto not_evil;
1295  }
1296  goto cleanup;
1297  }
1298  /* No GNUNET_free since memory was allocated by libc */
1299  free (evil_type_str);
1300  evil_type_str = NULL;
1301  evil_subtype_str = NULL;
1302  }
1303 not_evil:
1304  evil->type = EVILNESS_NONE;
1305 cleanup:
1306  GNUNET_free (evil_spec);
1307  /* no GNUNET_free_non_null since it wasn't
1308  * allocated with GNUNET_malloc */
1309  if (NULL != evil_type_str)
1310  free (evil_type_str);
1311  if (NULL != evil_subtype_str)
1312  free (evil_subtype_str);
1313 }
1314 
1315 #endif
1316 
1317 
1322 static void
1323 commit_set (struct ConsensusSession *session,
1324  struct TaskEntry *task)
1325 {
1326  struct SetEntry *set;
1327  struct SetOpCls *setop = &task->cls.setop;
1328 
1329  GNUNET_assert (NULL != setop->op);
1330  set = lookup_set (session, &setop->input_set);
1331  GNUNET_assert (NULL != set);
1332 
1333  if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested) )
1334  {
1335  struct GNUNET_SET_Element element;
1336  struct ConsensusElement ce = { 0 };
1338  element.data = &ce;
1339  element.size = sizeof (struct ConsensusElement);
1341  GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1342  }
1343 
1344  if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1345  {
1346  struct GNUNET_SET_Element element;
1347  struct ConsensusSizeElement cse = {
1348  .size = 0,
1349  .sender_index = 0
1350  };
1351  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "inserting size marker\n");
1353  cse.size = GNUNET_htonll (session->first_size);
1354  cse.sender_index = session->local_peer_idx;
1355  element.data = &cse;
1356  element.size = sizeof (struct ConsensusSizeElement);
1358  GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1359  }
1360 
1361 #ifdef EVIL
1362  {
1363  unsigned int i;
1364  struct Evilness evil;
1365 
1366  get_evilness (session, &evil);
1367  if (EVILNESS_NONE != evil.type)
1368  {
1369  /* Useful for evaluation */
1370  GNUNET_STATISTICS_set (statistics,
1371  "is evil",
1372  1,
1373  GNUNET_NO);
1374  }
1375  switch (evil.type)
1376  {
1377  case EVILNESS_CRAM_ALL:
1378  case EVILNESS_CRAM_LEAD:
1379  case EVILNESS_CRAM_ECHO:
1380  /* We're not cramming elements in the
1381  all-to-all round, since that would just
1382  add more elements to the result set, but
1383  wouldn't test robustness. */
1384  if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1385  {
1386  GNUNET_SET_commit (setop->op, set->h);
1387  break;
1388  }
1389  if ((EVILNESS_CRAM_LEAD == evil.type) &&
1390  ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1391  {
1392  GNUNET_SET_commit (setop->op, set->h);
1393  break;
1394  }
1395  if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1396  {
1397  GNUNET_SET_commit (setop->op, set->h);
1398  break;
1399  }
1400  for (i = 0; i < evil.num; i++)
1401  {
1402  struct GNUNET_SET_Element element;
1403  struct ConsensusStuffedElement se = {
1404  .ce.payload_type = 0,
1405  .ce.marker = 0,
1406  };
1407  element.data = &se;
1408  element.size = sizeof (struct ConsensusStuffedElement);
1410 
1411  if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1412  {
1413  /* Always generate a new element. */
1415  }
1416  else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1417  {
1418  /* Always cram the same elements, derived from counter. */
1419  GNUNET_CRYPTO_hash (&i, sizeof (i), &se.rand);
1420  }
1421  else
1422  {
1423  GNUNET_assert (0);
1424  }
1425  GNUNET_SET_add_element (set->h, &element, NULL, NULL);
1426 #ifdef GNUNET_EXTRA_LOGGING
1428  "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1429  session->local_peer_idx,
1430  debug_str_element (&element),
1431  debug_str_set_key (&setop->input_set),
1432  debug_str_task_key (&task->key));
1433 #endif
1434  }
1435  GNUNET_STATISTICS_update (statistics,
1436  "# stuffed elements",
1437  evil.num,
1438  GNUNET_NO);
1439  GNUNET_SET_commit (setop->op, set->h);
1440  break;
1441  case EVILNESS_SLACK:
1443  "P%u: evil peer: slacking\n",
1444  (unsigned int) session->local_peer_idx);
1445  /* Do nothing. */
1446  case EVILNESS_SLACK_A2A:
1447  if ( (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind ) ||
1448  (PHASE_KIND_ALL_TO_ALL == task->key.kind) )
1449  {
1450  struct GNUNET_SET_Handle *empty_set;
1451  empty_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
1452  GNUNET_SET_commit (setop->op, empty_set);
1453  GNUNET_SET_destroy (empty_set);
1454  }
1455  else
1456  {
1457  GNUNET_SET_commit (setop->op, set->h);
1458  }
1459  break;
1460  case EVILNESS_NONE:
1461  GNUNET_SET_commit (setop->op, set->h);
1462  break;
1463  }
1464  }
1465 #else
1466  if (GNUNET_NO == session->peers_blacklisted[task_other_peer (task)])
1467  {
1468  GNUNET_SET_commit (setop->op, set->h);
1469  }
1470  else
1471  {
1472  /* For our testcases, we don't want the blacklisted
1473  peers to wait. */
1475  setop->op = NULL;
1476  finish_task (task);
1477  }
1478 #endif
1479 }
1480 
1481 
1482 static void
1483 put_diff (struct ConsensusSession *session,
1484  struct DiffEntry *diff)
1485 {
1486  struct GNUNET_HashCode hash;
1487 
1488  GNUNET_assert (NULL != diff);
1489 
1490  GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
1492  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, diff,
1494 }
1495 
1496 static void
1497 put_set (struct ConsensusSession *session,
1498  struct SetEntry *set)
1499 {
1500  struct GNUNET_HashCode hash;
1501 
1502  GNUNET_assert (NULL != set->h);
1503 
1505  "Putting set %s\n",
1506  debug_str_set_key (&set->key));
1507 
1508  GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
1510  GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, set,
1512 }
1513 
1514 
1515 static void
1516 put_rfn (struct ConsensusSession *session,
1517  struct ReferendumEntry *rfn)
1518 {
1519  struct GNUNET_HashCode hash;
1520 
1521  GNUNET_CRYPTO_hash (&rfn->key, sizeof (struct RfnKey), &hash);
1523  GNUNET_CONTAINER_multihashmap_put (session->rfnmap, &hash, rfn,
1525 }
1526 
1527 
1528 
1529 static void
1531 {
1532  /* not implemented yet */
1533  GNUNET_assert (0);
1534 }
1535 
1536 
1537 static void
1539  struct ReferendumEntry *rfn,
1540  uint16_t voting_peer,
1541  uint16_t num_peers)
1542 {
1544  struct DiffElementInfo *di;
1545 
1547 
1548  while (GNUNET_YES ==
1550  NULL,
1551  (const void **) &di))
1552  {
1553  if (di->weight > 0)
1554  {
1555  rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
1556  }
1557  if (di->weight < 0)
1558  {
1559  rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
1560  }
1561  }
1562 
1564 }
1565 
1566 
1567 struct DiffEntry *
1569 {
1570  struct DiffEntry *d = GNUNET_new (struct DiffEntry);
1571 
1573 
1574  return d;
1575 }
1576 
1577 
1578 struct DiffEntry *
1579 diff_compose (struct DiffEntry *diff_1,
1580  struct DiffEntry *diff_2)
1581 {
1582  struct DiffEntry *diff_new;
1584  struct DiffElementInfo *di;
1585 
1586  diff_new = diff_create ();
1587 
1589  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1590  {
1591  diff_insert (diff_new, di->weight, di->element);
1592  }
1594 
1596  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, NULL, (const void **) &di))
1597  {
1598  diff_insert (diff_new, di->weight, di->element);
1599  }
1601 
1602  return diff_new;
1603 }
1604 
1605 
1606 struct ReferendumEntry *
1607 rfn_create (uint16_t size)
1608 {
1609  struct ReferendumEntry *rfn;
1610 
1611  rfn = GNUNET_new (struct ReferendumEntry);
1613  rfn->peer_commited = GNUNET_new_array (size, int);
1614  rfn->peer_contested = GNUNET_new_array (size, int);
1615  rfn->num_peers = size;
1616 
1617  return rfn;
1618 }
1619 
1620 
1621 #if UNUSED
1622 static void
1623 diff_destroy (struct DiffEntry *diff)
1624 {
1626  GNUNET_free (diff);
1627 }
1628 #endif
1629 
1630 
1636 static void
1637 rfn_majority (const struct ReferendumEntry *rfn,
1638  const struct RfnElementInfo *ri,
1639  uint16_t *ret_majority,
1640  enum ReferendumVote *ret_vote)
1641 {
1642  uint16_t votes_yes = 0;
1643  uint16_t num_commited = 0;
1644  uint16_t i;
1645 
1647  "Computing rfn majority for element %s of rfn {%s}\n",
1648  debug_str_element (ri->element),
1649  debug_str_rfn_key (&rfn->key));
1650 
1651  for (i = 0; i < rfn->num_peers; i++)
1652  {
1653  if (GNUNET_NO == rfn->peer_commited[i])
1654  continue;
1655  num_commited++;
1656 
1657  if (GNUNET_YES == ri->votes[i])
1658  votes_yes++;
1659  }
1660 
1661  if (votes_yes > (num_commited) / 2)
1662  {
1663  *ret_vote = ri->proposal;
1664  *ret_majority = votes_yes;
1665  }
1666  else
1667  {
1668  *ret_vote = VOTE_STAY;
1669  *ret_majority = num_commited - votes_yes;
1670  }
1671 }
1672 
1673 
1675 {
1676  struct TaskEntry *task;
1677  struct SetKey dst_set_key;
1678 };
1679 
1680 
1681 static void
1682 set_copy_cb (void *cls, struct GNUNET_SET_Handle *copy)
1683 {
1684  struct SetCopyCls *scc = cls;
1685  struct TaskEntry *task = scc->task;
1686  struct SetKey dst_set_key = scc->dst_set_key;
1687  struct SetEntry *set;
1688  struct SetHandle *sh = GNUNET_new (struct SetHandle);
1689 
1690  sh->h = copy;
1692  task->step->session->set_handles_tail,
1693  sh);
1694 
1695  GNUNET_free (scc);
1696  set = GNUNET_new (struct SetEntry);
1697  set->h = copy;
1698  set->key = dst_set_key;
1699  put_set (task->step->session, set);
1700 
1701  task->start (task);
1702 }
1703 
1704 
1709 static void
1711  struct SetKey *src_set_key,
1712  struct SetKey *dst_set_key)
1713 {
1714  struct SetEntry *src_set;
1715  struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
1716 
1718  "Copying set {%s} to {%s} for task {%s}\n",
1719  debug_str_set_key (src_set_key),
1720  debug_str_set_key (dst_set_key),
1721  debug_str_task_key (&task->key));
1722 
1723  scc->task = task;
1724  scc->dst_set_key = *dst_set_key;
1725  src_set = lookup_set (task->step->session, src_set_key);
1726  GNUNET_assert (NULL != src_set);
1727  GNUNET_SET_copy_lazy (src_set->h,
1728  set_copy_cb,
1729  scc);
1730 }
1731 
1732 
1734 {
1739  struct TaskEntry *task;
1740 };
1741 
1742 
1743 static void
1745 {
1746  struct SetMutationProgressCls *pc = cls;
1747 
1748  GNUNET_assert (pc->num_pending > 0);
1749 
1750  pc->num_pending--;
1751 
1752  if (0 == pc->num_pending)
1753  {
1754  struct TaskEntry *task = pc->task;
1755  GNUNET_free (pc);
1756  finish_task (task);
1757  }
1758 }
1759 
1760 
1761 static void
1763 {
1764  unsigned int i;
1765 
1766  if (GNUNET_YES == step->is_running)
1767  return;
1768  if (GNUNET_YES == step->is_finished)
1769  return;
1770  if (GNUNET_NO == step->early_finishable)
1771  return;
1772 
1773  step->is_finished = GNUNET_YES;
1774 
1775 #ifdef GNUNET_EXTRA_LOGGING
1777  "Finishing step `%s' early.\n",
1778  step->debug_name);
1779 #endif
1780 
1781  for (i = 0; i < step->subordinates_len; i++)
1782  {
1783  GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1784  step->subordinates[i]->pending_prereq--;
1785 #ifdef GNUNET_EXTRA_LOGGING
1787  "Decreased pending_prereq to %u for step `%s'.\n",
1788  (unsigned int) step->subordinates[i]->pending_prereq,
1789  step->subordinates[i]->debug_name);
1790 
1791 #endif
1793  }
1794 
1795  // XXX: maybe schedule as task to avoid recursion?
1796  run_ready_steps (step->session);
1797 }
1798 
1799 
1800 static void
1802 {
1803  unsigned int i;
1804 
1805  GNUNET_assert (step->finished_tasks == step->tasks_len);
1806  GNUNET_assert (GNUNET_YES == step->is_running);
1807  GNUNET_assert (GNUNET_NO == step->is_finished);
1808 
1809 #ifdef GNUNET_EXTRA_LOGGING
1811  "All tasks of step `%s' with %u subordinates finished.\n",
1812  step->debug_name,
1813  step->subordinates_len);
1814 #endif
1815 
1816  for (i = 0; i < step->subordinates_len; i++)
1817  {
1818  GNUNET_assert (step->subordinates[i]->pending_prereq > 0);
1819  step->subordinates[i]->pending_prereq--;
1820 #ifdef GNUNET_EXTRA_LOGGING
1822  "Decreased pending_prereq to %u for step `%s'.\n",
1823  (unsigned int) step->subordinates[i]->pending_prereq,
1824  step->subordinates[i]->debug_name);
1825 
1826 #endif
1827  }
1828 
1829  step->is_finished = GNUNET_YES;
1830 
1831  // XXX: maybe schedule as task to avoid recursion?
1832  run_ready_steps (step->session);
1833 }
1834 
1835 
1836 
1843 static void
1845 {
1846  struct ConsensusSession *session = task->step->session;
1847  struct SetKey sk_in;
1848  struct SetKey sk_out;
1849  struct RfnKey rk_in;
1850  struct SetEntry *set_out;
1851  struct ReferendumEntry *rfn_in;
1853  struct RfnElementInfo *ri;
1854  struct SetMutationProgressCls *progress_cls;
1855  uint16_t worst_majority = UINT16_MAX;
1856 
1857  sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1858  rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1859  sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1860 
1861  set_out = lookup_set (session, &sk_out);
1862  if (NULL == set_out)
1863  {
1864  create_set_copy_for_task (task, &sk_in, &sk_out);
1865  return;
1866  }
1867 
1868  rfn_in = lookup_rfn (session, &rk_in);
1869  GNUNET_assert (NULL != rfn_in);
1870 
1871  progress_cls = GNUNET_new (struct SetMutationProgressCls);
1872  progress_cls->task = task;
1873 
1875 
1876  while (GNUNET_YES ==
1878  NULL,
1879  (const void **) &ri))
1880  {
1881  uint16_t majority_num;
1882  enum ReferendumVote majority_vote;
1883 
1884  rfn_majority (rfn_in, ri, &majority_num, &majority_vote);
1885 
1886  if (worst_majority > majority_num)
1887  worst_majority = majority_num;
1888 
1889  switch (majority_vote)
1890  {
1891  case VOTE_ADD:
1892  progress_cls->num_pending++;
1894  GNUNET_SET_add_element (set_out->h,
1895  ri->element,
1897  progress_cls));
1899  "P%u: apply round: adding element %s with %u-majority.\n",
1900  session->local_peer_idx,
1901  debug_str_element (ri->element), majority_num);
1902  break;
1903  case VOTE_REMOVE:
1904  progress_cls->num_pending++;
1906  GNUNET_SET_remove_element (set_out->h,
1907  ri->element,
1909  progress_cls));
1911  "P%u: apply round: deleting element %s with %u-majority.\n",
1912  session->local_peer_idx,
1913  debug_str_element (ri->element), majority_num);
1914  break;
1915  case VOTE_STAY:
1917  "P%u: apply round: keeping element %s with %u-majority.\n",
1918  session->local_peer_idx,
1919  debug_str_element (ri->element), majority_num);
1920  // do nothing
1921  break;
1922  default:
1923  GNUNET_assert (0);
1924  break;
1925  }
1926  }
1927 
1928  if (0 == progress_cls->num_pending)
1929  {
1930  // call closure right now, no pending ops
1931  GNUNET_free (progress_cls);
1932  finish_task (task);
1933  }
1934 
1935  {
1936  uint16_t thresh = (session->num_peers / 3) * 2;
1937 
1938  if (worst_majority >= thresh)
1939  {
1940  switch (session->early_stopping)
1941  {
1942  case EARLY_STOPPING_NONE:
1945  "P%u: Stopping early (after one more superround)\n",
1946  session->local_peer_idx);
1947  break;
1949  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1950  session->local_peer_idx);
1952  {
1953  struct Step *step;
1954  for (step = session->steps_head; NULL != step; step = step->next)
1955  try_finish_step_early (step);
1956  }
1957  break;
1958  case EARLY_STOPPING_DONE:
1959  /* We shouldn't be here anymore after early stopping */
1960  GNUNET_break (0);
1961  break;
1962  default:
1963  GNUNET_assert (0);
1964  break;
1965  }
1966  }
1967  else if (EARLY_STOPPING_NONE != session->early_stopping)
1968  {
1969  // Our assumption about the number of bad peers
1970  // has been broken.
1971  GNUNET_break_op (0);
1972  }
1973  else
1974  {
1975  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1976  session->local_peer_idx);
1977  }
1978  }
1980 }
1981 
1982 
1983 static void
1985 {
1986  struct ConsensusSession *session = task->step->session;
1987  struct ReferendumEntry *output_rfn;
1988  struct ReferendumEntry *input_rfn;
1989  struct DiffEntry *input_diff;
1990  struct RfnKey rfn_key;
1991  struct DiffKey diff_key;
1993  struct RfnElementInfo *ri;
1994  unsigned int gradecast_confidence = 2;
1995 
1996  rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1997  output_rfn = lookup_rfn (session, &rfn_key);
1998  if (NULL == output_rfn)
1999  {
2000  output_rfn = rfn_create (session->num_peers);
2001  output_rfn->key = rfn_key;
2002  put_rfn (session, output_rfn);
2003  }
2004 
2005  diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2006  input_diff = lookup_diff (session, &diff_key);
2007  GNUNET_assert (NULL != input_diff);
2008 
2009  rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2010  input_rfn = lookup_rfn (session, &rfn_key);
2011  GNUNET_assert (NULL != input_rfn);
2012 
2014 
2015  apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, session->num_peers);
2016 
2017  while (GNUNET_YES ==
2019  NULL,
2020  (const void **) &ri))
2021  {
2022  uint16_t majority_num;
2023  enum ReferendumVote majority_vote;
2024 
2025  // XXX: we need contested votes and non-contested votes here
2026  rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2027 
2028  if (majority_num <= session->num_peers / 3)
2029  majority_vote = VOTE_REMOVE;
2030 
2031  switch (majority_vote)
2032  {
2033  case VOTE_STAY:
2034  break;
2035  case VOTE_ADD:
2036  rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
2037  break;
2038  case VOTE_REMOVE:
2039  rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2040  break;
2041  default:
2042  GNUNET_assert (0);
2043  break;
2044  }
2045  }
2047 
2048  {
2049  uint16_t noncontested;
2050  noncontested = rfn_noncontested (input_rfn);
2051  if (noncontested < (session->num_peers / 3) * 2)
2052  {
2053  gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
2054  }
2055  if (noncontested < (session->num_peers / 3) + 1)
2056  {
2057  gradecast_confidence = 0;
2058  }
2059  }
2060 
2061  if (gradecast_confidence >= 1)
2062  rfn_commit (output_rfn, task->key.leader);
2063 
2064  if (gradecast_confidence <= 1)
2065  session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2066 
2067  finish_task (task);
2068 }
2069 
2070 
2071 static void
2073 {
2074  struct SetEntry *input;
2075  struct SetOpCls *setop = &task->cls.setop;
2076  struct ConsensusSession *session = task->step->session;
2077 
2078  input = lookup_set (session, &setop->input_set);
2079  GNUNET_assert (NULL != input);
2080  GNUNET_assert (NULL != input->h);
2081 
2082  /* We create the outputs for the operation here
2083  (rather than in the set operation callback)
2084  because we want something valid in there, even
2085  if the other peer doesn't talk to us */
2086 
2087  if (SET_KIND_NONE != setop->output_set.set_kind)
2088  {
2089  /* If we don't have an existing output set,
2090  we clone the input set. */
2091  if (NULL == lookup_set (session, &setop->output_set))
2092  {
2093  create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
2094  return;
2095  }
2096  }
2097 
2098  if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2099  {
2100  if (NULL == lookup_rfn (session, &setop->output_rfn))
2101  {
2102  struct ReferendumEntry *rfn;
2103 
2105  "P%u: output rfn <%s> missing, creating.\n",
2106  session->local_peer_idx,
2107  debug_str_rfn_key (&setop->output_rfn));
2108 
2109  rfn = rfn_create (session->num_peers);
2110  rfn->key = setop->output_rfn;
2111  put_rfn (session, rfn);
2112  }
2113  }
2114 
2115  if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2116  {
2117  if (NULL == lookup_diff (session, &setop->output_diff))
2118  {
2119  struct DiffEntry *diff;
2120 
2121  diff = diff_create ();
2122  diff->key = setop->output_diff;
2123  put_diff (session, diff);
2124  }
2125  }
2126 
2127  if ( (task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx) )
2128  {
2129  /* XXX: mark the corresponding rfn as commited if necessary */
2130  finish_task (task);
2131  return;
2132  }
2133 
2134  if (task->key.peer1 == session->local_peer_idx)
2135  {
2137 
2139  "P%u: Looking up set {%s} to run remote union\n",
2140  session->local_peer_idx,
2141  debug_str_set_key (&setop->input_set));
2142 
2144  rcm.header.size = htons (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage));
2145 
2146  rcm.kind = htons (task->key.kind);
2147  rcm.peer1 = htons (task->key.peer1);
2148  rcm.peer2 = htons (task->key.peer2);
2149  rcm.leader = htons (task->key.leader);
2150  rcm.repetition = htons (task->key.repetition);
2151  rcm.is_contested = htons (0);
2152 
2153  GNUNET_assert (NULL == setop->op);
2154  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2155  session->local_peer_idx, task->key.peer2, debug_str_set_key (&setop->input_set));
2156 
2157  struct GNUNET_SET_Option opts[] = {
2158  { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2160  };
2161 
2162  // XXX: maybe this should be done while
2163  // setting up tasks alreays?
2164  setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
2165  &session->global_id,
2166  &rcm.header,
2168  opts,
2169  set_result_cb,
2170  task);
2171 
2172  commit_set (session, task);
2173  }
2174  else if (task->key.peer2 == session->local_peer_idx)
2175  {
2176  /* Wait for the other peer to contact us */
2177  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2178  session->local_peer_idx, task->key.peer1);
2179 
2180  if (NULL != setop->op)
2181  {
2182  commit_set (session, task);
2183  }
2184  }
2185  else
2186  {
2187  /* We made an error while constructing the task graph. */
2188  GNUNET_assert (0);
2189  }
2190 }
2191 
2192 
2193 static void
2195 {
2197  struct ReferendumEntry *input_rfn;
2198  struct RfnElementInfo *ri;
2199  struct SetEntry *output_set;
2200  struct SetMutationProgressCls *progress_cls;
2201  struct ConsensusSession *session = task->step->session;
2202  struct SetKey sk_in;
2203  struct SetKey sk_out;
2204  struct RfnKey rk_in;
2205 
2206  sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2207  sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2208  output_set = lookup_set (session, &sk_out);
2209  if (NULL == output_set)
2210  {
2211  create_set_copy_for_task (task, &sk_in, &sk_out);
2212  return;
2213  }
2214 
2215 
2216  {
2217  // FIXME: should be marked as a shallow copy, so
2218  // we can destroy everything correctly
2219  struct SetEntry *last_set = GNUNET_new (struct SetEntry);
2220  last_set->h = output_set->h;
2221  last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2222  put_set (session, last_set);
2223  }
2224 
2226  "Evaluating referendum in Task {%s}\n",
2227  debug_str_task_key (&task->key));
2228 
2229  progress_cls = GNUNET_new (struct SetMutationProgressCls);
2230  progress_cls->task = task;
2231 
2232  rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2233  input_rfn = lookup_rfn (session, &rk_in);
2234 
2235  GNUNET_assert (NULL != input_rfn);
2236 
2238  GNUNET_assert (NULL != iter);
2239 
2240  while (GNUNET_YES ==
2242  NULL,
2243  (const void **) &ri))
2244  {
2245  enum ReferendumVote majority_vote;
2246  uint16_t majority_num;
2247 
2248  rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
2249 
2250  if (majority_num < session->num_peers / 3)
2251  {
2252  /* It is not the case that all nonfaulty peers
2253  echoed the same value. Since we're doing a set reconciliation, we
2254  can't simply send "nothing" for the value. Thus we mark our 'confirm'
2255  reconciliation as contested. Other peers might not know that the
2256  leader is faulty, thus we still re-distribute in the confirmation
2257  round. */
2258  output_set->is_contested = GNUNET_YES;
2259  }
2260 
2261  switch (majority_vote)
2262  {
2263  case VOTE_ADD:
2264  progress_cls->num_pending++;
2266  GNUNET_SET_add_element (output_set->h,
2267  ri->element,
2269  progress_cls));
2270  break;
2271  case VOTE_REMOVE:
2272  progress_cls->num_pending++;
2274  GNUNET_SET_remove_element (output_set->h,
2275  ri->element,
2277  progress_cls));
2278  break;
2279  case VOTE_STAY:
2280  /* Nothing to do. */
2281  break;
2282  default:
2283  /* not reached */
2284  GNUNET_assert (0);
2285  }
2286  }
2287 
2289 
2290  if (0 == progress_cls->num_pending)
2291  {
2292  // call closure right now, no pending ops
2293  GNUNET_free (progress_cls);
2294  finish_task (task);
2295  }
2296 }
2297 
2298 
2299 static void
2301 {
2302  struct SetEntry *final_set;
2303  struct ConsensusSession *session = task->step->session;
2304 
2305  final_set = lookup_set (session, &task->cls.finish.input_set);
2306 
2307  GNUNET_assert (NULL != final_set);
2308 
2309 
2310  GNUNET_SET_iterate (final_set->h,
2312  task);
2313 }
2314 
2315 static void
2316 start_task (struct ConsensusSession *session, struct TaskEntry *task)
2317 {
2318  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key (&task->key));
2319 
2320  GNUNET_assert (GNUNET_NO == task->is_started);
2321  GNUNET_assert (GNUNET_NO == task->is_finished);
2322  GNUNET_assert (NULL != task->start);
2323 
2324  task->start (task);
2325 
2326  task->is_started = GNUNET_YES;
2327 }
2328 
2329 
2330 
2331 
2332 /*
2333  * Run all steps of the session that don't any
2334  * more dependencies.
2335  */
2336 static void
2338 {
2339  struct Step *step;
2340 
2341  step = session->steps_head;
2342 
2343  while (NULL != step)
2344  {
2345  if ( (GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished) )
2346  {
2347  size_t i;
2348 
2349  GNUNET_assert (0 == step->finished_tasks);
2350 
2351 #ifdef GNUNET_EXTRA_LOGGING
2352  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2353  session->local_peer_idx,
2354  step->debug_name,
2355  step->round, step->tasks_len, step->subordinates_len);
2356 #endif
2357 
2358  step->is_running = GNUNET_YES;
2359  for (i = 0; i < step->tasks_len; i++)
2360  start_task (session, step->tasks[i]);
2361 
2362  /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2363  if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2364  finish_step (step);
2365 
2366  /* Running the next ready steps will be triggered by task completion */
2367  return;
2368  }
2369  step = step->next;
2370  }
2371 
2372  return;
2373 }
2374 
2375 
2376 
2377 static void
2378 finish_task (struct TaskEntry *task)
2379 {
2380  GNUNET_assert (GNUNET_NO == task->is_finished);
2381  task->is_finished = GNUNET_YES;
2382 
2383  task->step->finished_tasks++;
2384 
2386  "P%u: Finishing Task {%s} (now %u/%u tasks finished in step)\n",
2387  task->step->session->local_peer_idx,
2388  debug_str_task_key (&task->key),
2389  (unsigned int) task->step->finished_tasks,
2390  (unsigned int) task->step->tasks_len);
2391 
2392  if (task->step->finished_tasks == task->step->tasks_len)
2393  finish_step (task->step);
2394 }
2395 
2396 
2404 static int
2405 get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2406 {
2407  int i;
2408  for (i = 0; i < session->num_peers; i++)
2409  if (0 == GNUNET_memcmp (peer, &session->peers[i]))
2410  return i;
2411  return -1;
2412 }
2413 
2414 
2424 static void
2426  const struct GNUNET_HashCode *local_session_id)
2427 {
2428  const char *salt = "gnunet-service-consensus/session_id";
2429 
2431  GNUNET_CRYPTO_kdf (&session->global_id,
2432  sizeof (struct GNUNET_HashCode),
2433  salt,
2434  strlen (salt),
2435  session->peers,
2436  session->num_peers * sizeof (struct GNUNET_PeerIdentity),
2437  local_session_id,
2438  sizeof (struct GNUNET_HashCode),
2439  NULL));
2440 }
2441 
2442 
2450 static int
2451 peer_id_cmp (const void *h1, const void *h2)
2452 {
2453  return memcmp (h1, h2, sizeof (struct GNUNET_PeerIdentity));
2454 }
2455 
2456 
2464 static void
2466  const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2467 {
2468  const struct GNUNET_PeerIdentity *msg_peers
2469  = (const struct GNUNET_PeerIdentity *) &join_msg[1];
2470  int local_peer_in_list;
2471 
2472  session->num_peers = ntohl (join_msg->num_peers);
2473 
2474  /* Peers in the join message, may or may not include the local peer,
2475  Add it if it is missing. */
2476  local_peer_in_list = GNUNET_NO;
2477  for (unsigned int i = 0; i < session->num_peers; i++)
2478  {
2479  if (0 == GNUNET_memcmp (&msg_peers[i],
2480  &my_peer))
2481  {
2482  local_peer_in_list = GNUNET_YES;
2483  break;
2484  }
2485  }
2486  if (GNUNET_NO == local_peer_in_list)
2487  session->num_peers++;
2488 
2489  session->peers = GNUNET_new_array (session->num_peers,
2490  struct GNUNET_PeerIdentity);
2491  if (GNUNET_NO == local_peer_in_list)
2492  session->peers[session->num_peers - 1] = my_peer;
2493 
2494  GNUNET_memcpy (session->peers,
2495  msg_peers,
2496  ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity));
2497  qsort (session->peers,
2498  session->num_peers,
2499  sizeof (struct GNUNET_PeerIdentity),
2500  &peer_id_cmp);
2501 }
2502 
2503 
2504 static struct TaskEntry *
2506  struct TaskKey *key)
2507 {
2508  struct GNUNET_HashCode hash;
2509 
2510 
2511  GNUNET_CRYPTO_hash (key, sizeof (struct TaskKey), &hash);
2512  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2513  GNUNET_h2s (&hash));
2514  return GNUNET_CONTAINER_multihashmap_get (session->taskmap, &hash);
2515 }
2516 
2517 
2533 static void
2534 set_listen_cb (void *cls,
2535  const struct GNUNET_PeerIdentity *other_peer,
2536  const struct GNUNET_MessageHeader *context_msg,
2537  struct GNUNET_SET_Request *request)
2538 {
2539  struct ConsensusSession *session = cls;
2540  struct TaskKey tk;
2541  struct TaskEntry *task;
2543 
2544  if (NULL == context_msg)
2545  {
2546  GNUNET_break_op (0);
2547  return;
2548  }
2549 
2550  if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs (context_msg->type))
2551  {
2552  GNUNET_break_op (0);
2553  return;
2554  }
2555 
2556  if (sizeof (struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs (context_msg->size))
2557  {
2558  GNUNET_break_op (0);
2559  return;
2560  }
2561 
2562  cm = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
2563 
2564  tk = ((struct TaskKey) {
2565  .kind = ntohs (cm->kind),
2566  .peer1 = ntohs (cm->peer1),
2567  .peer2 = ntohs (cm->peer2),
2568  .repetition = ntohs (cm->repetition),
2569  .leader = ntohs (cm->leader),
2570  });
2571 
2572  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2573  session->local_peer_idx, debug_str_task_key (&tk));
2574 
2575  task = lookup_task (session, &tk);
2576 
2577  if (NULL == task)
2578  {
2579  GNUNET_break_op (0);
2580  return;
2581  }
2582 
2583  if (GNUNET_YES == task->is_finished)
2584  {
2585  GNUNET_break_op (0);
2586  return;
2587  }
2588 
2589  if (task->key.peer2 != session->local_peer_idx)
2590  {
2591  /* We're being asked, so we must be thne 2nd peer. */
2592  GNUNET_break_op (0);
2593  return;
2594  }
2595 
2596  GNUNET_assert (! ((task->key.peer1 == session->local_peer_idx) &&
2597  (task->key.peer2 == session->local_peer_idx)));
2598 
2599  struct GNUNET_SET_Option opts[] = {
2600  { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2602  };
2603 
2604  task->cls.setop.op = GNUNET_SET_accept (request,
2606  opts,
2607  set_result_cb,
2608  task);
2609 
2610  /* If the task hasn't been started yet,
2611  we wait for that until we commit. */
2612 
2613  if (GNUNET_YES == task->is_started)
2614  {
2615  commit_set (session, task);
2616  }
2617 }
2618 
2619 
2620 
2621 static void
2623  struct TaskEntry *t)
2624 {
2625  struct GNUNET_HashCode round_hash;
2626  struct Step *s;
2627 
2628  GNUNET_assert (NULL != t->step);
2629 
2630  t = GNUNET_memdup (t, sizeof (struct TaskEntry));
2631 
2632  s = t->step;
2633 
2634  if (s->tasks_len == s->tasks_cap)
2635  {
2636  unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2638  s->tasks_cap,
2639  target_size);
2640  }
2641 
2642 #ifdef GNUNET_EXTRA_LOGGING
2643  GNUNET_assert (NULL != s->debug_name);
2644  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2645  debug_str_task_key (&t->key),
2646  s->debug_name);
2647 #endif
2648 
2649  s->tasks[s->tasks_len] = t;
2650  s->tasks_len++;
2651 
2652  GNUNET_CRYPTO_hash (&t->key, sizeof (struct TaskKey), &round_hash);
2654  GNUNET_CONTAINER_multihashmap_put (taskmap, &round_hash, t,
2656 }
2657 
2658 
2659 static void
2661 {
2662  /* Given the fully constructed task graph
2663  with rounds for tasks, we can give the tasks timeouts. */
2664 
2665  // unsigned int max_round;
2666 
2667  /* XXX: implement! */
2668 }
2669 
2670 
2671 
2672 /*
2673  * Arrange two peers in some canonical order.
2674  */
2675 static void
2676 arrange_peers (uint16_t *p1, uint16_t *p2, uint16_t n)
2677 {
2678  uint16_t a;
2679  uint16_t b;
2680 
2681  GNUNET_assert (*p1 < n);
2682  GNUNET_assert (*p2 < n);
2683 
2684  if (*p1 < *p2)
2685  {
2686  a = *p1;
2687  b = *p2;
2688  }
2689  else
2690  {
2691  a = *p2;
2692  b = *p1;
2693  }
2694 
2695  /* For uniformly random *p1, *p2,
2696  this condition is true with 50% chance */
2697  if (((b - a) + n) % n <= n / 2)
2698  {
2699  *p1 = a;
2700  *p2 = b;
2701  }
2702  else
2703  {
2704  *p1 = b;
2705  *p2 = a;
2706  }
2707 }
2708 
2709 
2713 static void
2714 step_depend_on (struct Step *step, struct Step *dep)
2715 {
2716  /* We're not checking for cyclic dependencies,
2717  but this is a cheap sanity check. */
2718  GNUNET_assert (step != dep);
2719  GNUNET_assert (NULL != step);
2720  GNUNET_assert (NULL != dep);
2721  GNUNET_assert (dep->round <= step->round);
2722 
2723 #ifdef GNUNET_EXTRA_LOGGING
2724  /* Make sure we have complete debugging information.
2725  Also checks that we don't screw up too badly
2726  constructing the task graph. */
2727  GNUNET_assert (NULL != step->debug_name);
2728  GNUNET_assert (NULL != dep->debug_name);
2730  "Making step `%s' depend on `%s'\n",
2731  step->debug_name,
2732  dep->debug_name);
2733 #endif
2734 
2735  if (dep->subordinates_cap == dep->subordinates_len)
2736  {
2737  unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2739  dep->subordinates_cap,
2740  target_size);
2741  }
2742 
2744 
2745  dep->subordinates[dep->subordinates_len] = step;
2746  dep->subordinates_len++;
2747 
2748  step->pending_prereq++;
2749 }
2750 
2751 
2752 static struct Step *
2754 {
2755  struct Step *step;
2756  step = GNUNET_new (struct Step);
2757  step->session = session;
2758  step->round = round;
2761  session->steps_tail,
2762  step);
2763  return step;
2764 }
2765 
2766 
2771 static void
2773  uint16_t rep,
2774  uint16_t lead,
2775  struct Step *step_before,
2776  struct Step *step_after)
2777 {
2778  uint16_t n = session->num_peers;
2779  uint16_t me = session->local_peer_idx;
2780 
2781  uint16_t p1;
2782  uint16_t p2;
2783 
2784  /* The task we're currently setting up. */
2785  struct TaskEntry task;
2786 
2787  struct Step *step;
2788  struct Step *prev_step;
2789 
2790  uint16_t round;
2791 
2792  unsigned int k;
2793 
2794  round = step_before->round + 1;
2795 
2796  /* gcast step 1: leader disseminates */
2797 
2798  step = create_step (session, round, GNUNET_YES);
2799 
2800 #ifdef GNUNET_EXTRA_LOGGING
2801  GNUNET_asprintf (&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2802 #endif
2803  step_depend_on (step, step_before);
2804 
2805  if (lead == me)
2806  {
2807  for (k = 0; k < n; k++)
2808  {
2809  if (k == me)
2810  continue;
2811  p1 = me;
2812  p2 = k;
2813  arrange_peers (&p1, &p2, n);
2814  task = ((struct TaskEntry) {
2815  .step = step,
2816  .start = task_start_reconcile,
2817  .cancel = task_cancel_reconcile,
2818  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2819  });
2820  task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2821  put_task (session->taskmap, &task);
2822  }
2823  /* We run this task to make sure that the leader
2824  has the stored the SET_KIND_LEADER set of himself,
2825  so it can participate in the rest of the gradecast
2826  without the code having to handle any special cases. */
2827  task = ((struct TaskEntry) {
2828  .step = step,
2829  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2830  .start = task_start_reconcile,
2831  .cancel = task_cancel_reconcile,
2832  });
2833  task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2834  task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2835  task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2836  put_task (session->taskmap, &task);
2837  }
2838  else
2839  {
2840  p1 = me;
2841  p2 = lead;
2842  arrange_peers (&p1, &p2, n);
2843  task = ((struct TaskEntry) {
2844  .step = step,
2845  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2846  .start = task_start_reconcile,
2847  .cancel = task_cancel_reconcile,
2848  });
2849  task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2850  task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2851  task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2852  put_task (session->taskmap, &task);
2853  }
2854 
2855  /* gcast phase 2: echo */
2856  prev_step = step;
2857  round += 1;
2858  step = create_step (session, round, GNUNET_YES);
2859 #ifdef GNUNET_EXTRA_LOGGING
2860  GNUNET_asprintf (&step->debug_name, "echo leader %u rep %u", lead, rep);
2861 #endif
2862  step_depend_on (step, prev_step);
2863 
2864  for (k = 0; k < n; k++)
2865  {
2866  p1 = k;
2867  p2 = me;
2868  arrange_peers (&p1, &p2, n);
2869  task = ((struct TaskEntry) {
2870  .step = step,
2871  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2872  .start = task_start_reconcile,
2873  .cancel = task_cancel_reconcile,
2874  });
2875  task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2876  task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2877  put_task (session->taskmap, &task);
2878  }
2879 
2880  prev_step = step;
2881  /* Same round, since step only has local tasks */
2882  step = create_step (session, round, GNUNET_YES);
2883 #ifdef GNUNET_EXTRA_LOGGING
2884  GNUNET_asprintf (&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2885 #endif
2886  step_depend_on (step, prev_step);
2887 
2888  arrange_peers (&p1, &p2, n);
2889  task = ((struct TaskEntry) {
2890  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2891  .step = step,
2892  .start = task_start_eval_echo
2893  });
2894  put_task (session->taskmap, &task);
2895 
2896  prev_step = step;
2897  round += 1;
2898  step = create_step (session, round, GNUNET_YES);
2899 #ifdef GNUNET_EXTRA_LOGGING
2900  GNUNET_asprintf (&step->debug_name, "confirm leader %u rep %u", lead, rep);
2901 #endif
2902  step_depend_on (step, prev_step);
2903 
2904  /* gcast phase 3: confirmation and grading */
2905  for (k = 0; k < n; k++)
2906  {
2907  p1 = k;
2908  p2 = me;
2909  arrange_peers (&p1, &p2, n);
2910  task = ((struct TaskEntry) {
2911  .step = step,
2912  .start = task_start_reconcile,
2913  .cancel = task_cancel_reconcile,
2914  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead},
2915  });
2916  task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2917  task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2918  /* If there was at least one element in the echo round that was
2919  contested (i.e. it had no n-t majority), then we let the other peers
2920  know, and other peers let us know. The contested flag for each peer is
2921  stored in the rfn. */
2923  put_task (session->taskmap, &task);
2924  }
2925 
2926  prev_step = step;
2927  /* Same round, since step only has local tasks */
2928  step = create_step (session, round, GNUNET_YES);
2929 #ifdef GNUNET_EXTRA_LOGGING
2930  GNUNET_asprintf (&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2931 #endif
2932  step_depend_on (step, prev_step);
2933 
2934  task = ((struct TaskEntry) {
2935  .step = step,
2936  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2937  .start = task_start_grade,
2938  });
2939  put_task (session->taskmap, &task);
2940 
2941  step_depend_on (step_after, step);
2942 }
2943 
2944 
2945 static void
2947 {
2948  uint16_t n = session->num_peers;
2949  uint16_t t = n / 3;
2950 
2951  uint16_t me = session->local_peer_idx;
2952 
2953  /* The task we're currently setting up. */
2954  struct TaskEntry task;
2955 
2956  /* Current leader */
2957  unsigned int lead;
2958 
2959  struct Step *step;
2960  struct Step *prev_step;
2961 
2962  unsigned int round = 0;
2963 
2964  unsigned int i;
2965 
2966  // XXX: introduce first step,
2967  // where we wait for all insert acks
2968  // from the set service
2969 
2970  /* faster but brittle all-to-all */
2971 
2972  // XXX: Not implemented yet
2973 
2974  /* all-to-all step */
2975 
2976  step = create_step (session, round, GNUNET_NO);
2977 
2978 #ifdef GNUNET_EXTRA_LOGGING
2979  step->debug_name = GNUNET_strdup ("all to all");
2980 #endif
2981 
2982  for (i = 0; i < n; i++)
2983  {
2984  uint16_t p1;
2985  uint16_t p2;
2986 
2987  p1 = me;
2988  p2 = i;
2989  arrange_peers (&p1, &p2, n);
2990  task = ((struct TaskEntry) {
2991  .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
2992  .step = step,
2993  .start = task_start_reconcile,
2994  .cancel = task_cancel_reconcile,
2995  });
2996  task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
2997  task.cls.setop.output_set = task.cls.setop.input_set;
2999  put_task (session->taskmap, &task);
3000  }
3001 
3002  round += 1;
3003  prev_step = step;
3004  step = create_step (session, round, GNUNET_NO);;
3005 #ifdef GNUNET_EXTRA_LOGGING
3006  step->debug_name = GNUNET_strdup ("all to all 2");
3007 #endif
3008  step_depend_on (step, prev_step);
3009 
3010 
3011  for (i = 0; i < n; i++)
3012  {
3013  uint16_t p1;
3014  uint16_t p2;
3015 
3016  p1 = me;
3017  p2 = i;
3018  arrange_peers (&p1, &p2, n);
3019  task = ((struct TaskEntry) {
3020  .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
3021  .step = step,
3022  .start = task_start_reconcile,
3023  .cancel = task_cancel_reconcile,
3024  });
3025  task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3026  task.cls.setop.output_set = task.cls.setop.input_set;
3028  put_task (session->taskmap, &task);
3029  }
3030 
3031  round += 1;
3032 
3033  prev_step = step;
3034  step = NULL;
3035 
3036 
3037 
3038  /* Byzantine union */
3039 
3040  /* sequential repetitions of the gradecasts */
3041  for (i = 0; i < t + 1; i++)
3042  {
3043  struct Step *step_rep_start;
3044  struct Step *step_rep_end;
3045 
3046  /* Every repetition is in a separate round. */
3047  step_rep_start = create_step (session, round, GNUNET_YES);
3048 #ifdef GNUNET_EXTRA_LOGGING
3049  GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
3050 #endif
3051 
3052  step_depend_on (step_rep_start, prev_step);
3053 
3054  /* gradecast has three rounds */
3055  round += 3;
3056  step_rep_end = create_step (session, round, GNUNET_YES);
3057 #ifdef GNUNET_EXTRA_LOGGING
3058  GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
3059 #endif
3060 
3061  /* parallel gradecasts */
3062  for (lead = 0; lead < n; lead++)
3063  construct_task_graph_gradecast (session, i, lead, step_rep_start, step_rep_end);
3064 
3065  task = ((struct TaskEntry) {
3066  .step = step_rep_end,
3067  .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
3068  .start = task_start_apply_round,
3069  });
3070  put_task (session->taskmap, &task);
3071 
3072  prev_step = step_rep_end;
3073  }
3074 
3075  /* There is no next gradecast round, thus the final
3076  start step is the overall end step of the gradecasts */
3077  round += 1;
3078  step = create_step (session, round, GNUNET_NO);
3079 #ifdef GNUNET_EXTRA_LOGGING
3080  GNUNET_asprintf (&step->debug_name, "finish");
3081 #endif
3082  step_depend_on (step, prev_step);
3083 
3084  task = ((struct TaskEntry) {
3085  .step = step,
3086  .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3087  .start = task_start_finish,
3088  });
3089  task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3090 
3091  put_task (session->taskmap, &task);
3092 }
3093 
3094 
3095 
3103 static int
3105  const struct GNUNET_CONSENSUS_JoinMessage *m)
3106 {
3107  uint32_t listed_peers = ntohl (m->num_peers);
3108 
3109  if ( (ntohs (m->header.size) - sizeof (*m)) !=
3110  listed_peers * sizeof (struct GNUNET_PeerIdentity))
3111  {
3112  GNUNET_break (0);
3113  return GNUNET_SYSERR;
3114  }
3115  return GNUNET_OK;
3116 }
3117 
3118 
3125 static void
3127  const struct GNUNET_CONSENSUS_JoinMessage *m)
3128 {
3129  struct ConsensusSession *session = cls;
3130  struct ConsensusSession *other_session;
3131 
3133  m);
3134  compute_global_id (session,
3135  &m->session_id);
3136 
3137  /* Check if some local client already owns the session.
3138  It is only legal to have a session with an existing global id
3139  if all other sessions with this global id are finished.*/
3140  for (other_session = sessions_head;
3141  NULL != other_session;
3142  other_session = other_session->next)
3143  {
3144  if ( (other_session != session) &&
3145  (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id,
3146  &other_session->global_id)) )
3147  break;
3148  }
3149 
3150  session->conclude_deadline
3152  session->conclude_start
3154  session->local_peer_idx = get_peer_idx (&my_peer,
3155  session);
3156  GNUNET_assert (-1 != session->local_peer_idx);
3157 
3159  "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3160  GNUNET_h2s (&m->session_id),
3161  session->num_peers,
3162  session->local_peer_idx,
3165  session->conclude_deadline),
3166  GNUNET_YES));
3167 
3168  session->set_listener
3169  = GNUNET_SET_listen (cfg,
3171  &session->global_id,
3172  &set_listen_cb,
3173  session);
3174 
3176  GNUNET_NO);
3178  GNUNET_NO);
3180  GNUNET_NO);
3182  GNUNET_NO);
3183 
3184  {
3185  struct SetEntry *client_set;
3186 
3187  client_set = GNUNET_new (struct SetEntry);
3188  client_set->h = GNUNET_SET_create (cfg,
3190  struct SetHandle *sh = GNUNET_new (struct SetHandle);
3191  sh->h = client_set->h;
3193  session->set_handles_tail,
3194  sh);
3195  client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3196  put_set (session,
3197  client_set);
3198  }
3199 
3200  session->peers_blacklisted = GNUNET_new_array (session->num_peers,
3201  int);
3202 
3203  /* Just construct the task graph,
3204  but don't run anything until the client calls conclude. */
3205  construct_task_graph (session);
3207 }
3208 
3209 
3210 static void
3212 {
3213  // FIXME: implement
3214 }
3215 
3216 
3224 static int
3226  const struct GNUNET_CONSENSUS_ElementMessage *msg)
3227 {
3228  return GNUNET_OK;
3229 }
3230 
3231 
3238 static void
3240  const struct GNUNET_CONSENSUS_ElementMessage *msg)
3241 {
3242  struct ConsensusSession *session = cls;
3243  ssize_t element_size;
3244  struct GNUNET_SET_Handle *initial_set;
3245  struct ConsensusElement *ce;
3246 
3247  if (GNUNET_YES == session->conclude_started)
3248  {
3249  GNUNET_break (0);
3251  return;
3252  }
3253 
3254  element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
3255  ce = GNUNET_malloc (sizeof (struct ConsensusElement) + element_size);
3256  GNUNET_memcpy (&ce[1], &msg[1], element_size);
3257  ce->payload_type = msg->element_type;
3258 
3259  struct GNUNET_SET_Element element = {
3261  .size = sizeof (struct ConsensusElement) + element_size,
3262  .data = ce,
3263  };
3264 
3265  {
3266  struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3267  struct SetEntry *entry;
3268 
3269  entry = lookup_set (session,
3270  &key);
3271  GNUNET_assert (NULL != entry);
3272  initial_set = entry->h;
3273  }
3274 
3275  session->num_client_insert_pending++;
3276  GNUNET_SET_add_element (initial_set,
3277  &element,
3279  session);
3280 
3281 #ifdef GNUNET_EXTRA_LOGGING
3282  {
3284  "P%u: element %s added\n",
3285  session->local_peer_idx,
3286  debug_str_element (&element));
3287  }
3288 #endif
3289  GNUNET_free (ce);
3291 }
3292 
3293 
3300 static void
3302  const struct GNUNET_MessageHeader *message)
3303 {
3304  struct ConsensusSession *session = cls;
3305 
3306  if (GNUNET_YES == session->conclude_started)
3307  {
3308  /* conclude started twice */
3309  GNUNET_break (0);
3311  return;
3312  }
3314  "conclude requested\n");
3315  session->conclude_started = GNUNET_YES;
3316  install_step_timeouts (session);
3317  run_ready_steps (session);
3319 }
3320 
3321 
3327 static void
3328 shutdown_task (void *cls)
3329 {
3331  "shutting down\n");
3332  GNUNET_STATISTICS_destroy (statistics,
3333  GNUNET_NO);
3334  statistics = NULL;
3335 }
3336 
3337 
3345 static void
3346 run (void *cls,
3347  const struct GNUNET_CONFIGURATION_Handle *c,
3349 {
3350  cfg = c;
3351  if (GNUNET_OK !=
3353  &my_peer))
3354  {
3356  "Could not retrieve host identity\n");
3358  return;
3359  }
3360  statistics = GNUNET_STATISTICS_create ("consensus",
3361  cfg);
3363  NULL);
3364 }
3365 
3366 
3375 static void *
3377  struct GNUNET_SERVICE_Client *c,
3378  struct GNUNET_MQ_Handle *mq)
3379 {
3380  struct ConsensusSession *session = GNUNET_new (struct ConsensusSession);
3381 
3382  session->client = c;
3383  session->client_mq = mq;
3384  GNUNET_CONTAINER_DLL_insert (sessions_head,
3385  sessions_tail,
3386  session);
3387  return session;
3388 }
3389 
3390 
3398 static void
3400  struct GNUNET_SERVICE_Client *c,
3401  void *internal_cls)
3402 {
3403  struct ConsensusSession *session = internal_cls;
3404 
3405  if (NULL != session->set_listener)
3406  {
3408  session->set_listener = NULL;
3409  }
3410  GNUNET_CONTAINER_DLL_remove (sessions_head,
3411  sessions_tail,
3412  session);
3413 
3414  while (session->set_handles_head)
3415  {
3416  struct SetHandle *sh = session->set_handles_head;
3417  session->set_handles_head = sh->next;
3418  GNUNET_SET_destroy (sh->h);
3419  GNUNET_free (sh);
3420  }
3421  GNUNET_free (session);
3422 }
3423 
3424 
3429 ("consensus",
3431  &run,
3434  NULL,
3435  GNUNET_MQ_hd_fixed_size (client_conclude,
3437  struct GNUNET_MessageHeader,
3438  NULL),
3439  GNUNET_MQ_hd_var_size (client_insert,
3442  NULL),
3443  GNUNET_MQ_hd_var_size (client_join,
3446  NULL),
3448 
3449 /* end of gnunet-service-consensus.c */
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT
Abort a round, don&#39;t send requested elements anymore.
uint64_t first_size
Our set size from the first round.
Vote that an element should be removed.
struct FinishCls finish
void GNUNET_CONTAINER_multihashmap_iterator_destroy(struct GNUNET_CONTAINER_MultiHashMapIterator *iter)
Destroy a multihashmap iterator.
Closure for both start_task and cancel_task.
int * peer_contested
Contestation state of the peer.
static GNUNET_NETWORK_STRUCT_END struct GNUNET_PeerIdentity me
Our own peer identity.
static int cmp_uint64_t(const void *pa, const void *pb)
struct GNUNET_CONTAINER_MultiHashMap * rfnmap
static int get_peer_idx(const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
Search peer in the list of peers in session.
struct GNUNET_SET_ListenHandle * GNUNET_SET_listen(const struct GNUNET_CONFIGURATION_Handle *cfg, enum GNUNET_SET_OperationType op_type, const struct GNUNET_HashCode *app_id, GNUNET_SET_ListenCallback listen_cb, void *listen_cls)
Wait for set operation requests for the given application ID.
Definition: set_api.c:1013
static struct GNUNET_SERVICE_Handle * service
Handle to our service instance.
unsigned int subordinates_cap
int GNUNET_CONTAINER_multihashmap_iterator_next(struct GNUNET_CONTAINER_MultiHashMapIterator *iter, struct GNUNET_HashCode *key, const void **value)
Retrieve the next element from the hash map at the iterator&#39;s position.
int GNUNET_SET_remove_element(struct GNUNET_SET_Handle *set, const struct GNUNET_SET_Element *element, GNUNET_SET_Continuation cont, void *cont_cls)
Remove an element to the given set.
Definition: set_api.c:733
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_ntoh(struct GNUNET_TIME_AbsoluteNBO a)
Convert absolute time from network byte order.
Definition: time.c:670
static unsigned int phase
Processing stage that we are in.
Definition: gnunet-arm.c:109
static void client_insert_done(void *cls)
struct GNUNET_STATISTICS_Handle * statistics
Statistics handle.
static int check_client_join(void *cls, const struct GNUNET_CONSENSUS_JoinMessage *m)
Check join message.
static unsigned int element_size
static void try_finish_step_early(struct Step *step)
Weighted diff.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
int GNUNET_SET_add_element(struct GNUNET_SET_Handle *set, const struct GNUNET_SET_Element *element, GNUNET_SET_Continuation cont, void *cont_cls)
Add an element to the given set.
Definition: set_api.c:686
int * peer_commited
Stores, for every peer in the session, whether the peer finished the whole referendum.
If a value with the given key exists, replace it.
Consensus element, either marker or payload.
struct GNUNET_SET_ListenHandle * set_listener
Listener for requests from other peers.
static void shutdown_task(void *cls)
Called to clean up, after a shutdown has been requested.
uint16_t payload_type
Payload element_type, only valid if this is not a marker element.
static void commit_set(struct ConsensusSession *session, struct TaskEntry *task)
Commit the appropriate set for a task.
Handle for a set operation request from another peer.
Definition: set_api.c:115
static struct GNUNET_IDENTITY_EgoLookup * el
EgoLookup.
struct SetHandle * set_handles_head
static void * client_connect_cb(void *cls, struct GNUNET_SERVICE_Client *c, struct GNUNET_MQ_Handle *mq)
Callback called when a client connects to the service.
Handle to a service.
Definition: service.c:116
static void apply_diff_to_rfn(struct DiffEntry *diff, struct ReferendumEntry *rfn, uint16_t voting_peer, uint16_t num_peers)
int16_t repetition
Repetition of the gradecast phase.
static void construct_task_graph(struct ConsensusSession *session)
Element should be added to the result set of the remote peer, i.e.
static void task_start_reconcile(struct TaskEntry *task)
static void put_diff(struct ConsensusSession *session, struct DiffEntry *diff)
struct GNUNET_CONTAINER_MultiHashMapIterator * GNUNET_CONTAINER_multihashmap_iterator_create(const struct GNUNET_CONTAINER_MultiHashMap *map)
Create an iterator for a multihashmap.
struct ReferendumEntry * rfn_create(uint16_t size)
static void task_start_grade(struct TaskEntry *task)
static void rfn_contest(struct ReferendumEntry *rfn, uint16_t contested_peer)
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run on shutdown, that is when a CTRL-C signal is received, or when GNUNET_SCHEDULER_shutdown() is being invoked.
Definition: scheduler.c:1293
static void task_start_finish(struct TaskEntry *task)
Element stored in a set.
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
struct GNUNET_SET_Handle * GNUNET_SET_create(const struct GNUNET_CONFIGURATION_Handle *cfg, enum GNUNET_SET_OperationType op)
Create an empty set, supporting the specified operation.
Definition: set_api.c:656
uint16_t is_contested
Non-zero if this set reconciliation had elements removed because they were contested.
Element should be added to the result set of the local peer, i.e.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
struct GNUNET_SET_Handle * h
static struct ConsensusSession * sessions_tail
Linked list of sessions this peer participates in.
static const char * phasename(uint16_t phase)
struct GNUNET_CONTAINER_MultiHashMap * setmap
struct ConsensusSession * session
struct ConsensusSession * next
Consensus sessions are kept in a DLL.
struct DiffEntry * diff_compose(struct DiffEntry *diff_1, struct DiffEntry *diff_2)
static struct SetEntry * lookup_set(struct ConsensusSession *session, struct SetKey *key)
static struct ReferendumEntry * lookup_rfn(struct ConsensusSession *session, struct RfnKey *key)
static void arrange_peers(uint16_t *p1, uint16_t *p2, uint16_t n)
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
static void finish_task(struct TaskEntry *task)
static int peer_id_cmp(const void *h1, const void *h2)
Compare two peer identities.
#define GNUNET_NO
Definition: gnunet_common.h:81
unsigned int num_peers
Number of other peers in the consensus.
#define GNUNET_memdup(buf, size)
Allocate and initialize a block of memory.
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:78
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
unsigned int tasks_len
#define GNUNET_new(type)
Allocate a struct or union of the given type.
struct GNUNET_SET_OperationHandle * GNUNET_SET_prepare(const struct GNUNET_PeerIdentity *other_peer, const struct GNUNET_HashCode *app_id, const struct GNUNET_MessageHeader *context_msg, enum GNUNET_SET_ResultMode result_mode, struct GNUNET_SET_Option options[], GNUNET_SET_ResultIterator result_cb, void *result_cls)
Prepare a set operation to be evaluated with another peer.
Definition: set_api.c:812
static struct GNUNET_PeerIdentity my_peer
Peer that runs this service.
static void compute_global_id(struct ConsensusSession *session, const struct GNUNET_HashCode *local_session_id)
Compute a global, (hopefully) unique consensus session id, from the local id of the consensus session...
static struct GNUNET_SCHEDULER_Task * t
Main task.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
static struct TaskEntry * lookup_task(struct ConsensusSession *session, struct TaskKey *key)
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
struct GNUNET_TIME_AbsoluteNBO start
Start time for the consensus.
Definition: consensus.h:58
struct Step * next
All steps of one session are in a linked list for easier deallocation.
void GNUNET_SCHEDULER_shutdown(void)
Request the shutdown of a scheduler.
Definition: scheduler.c:524
static int ret
Final status code.
Definition: gnunet-arm.c:89
Handle for the service.
const struct GNUNET_SET_Element * element
GNUNET_SERVICE_MAIN("consensus", GNUNET_SERVICE_OPTION_NONE, &run, &client_connect_cb, &client_disconnect_cb, NULL, GNUNET_MQ_hd_fixed_size(client_conclude, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, struct GNUNET_MessageHeader, NULL), GNUNET_MQ_hd_var_size(client_insert, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, struct GNUNET_CONSENSUS_ElementMessage, NULL), GNUNET_MQ_hd_var_size(client_join, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, struct GNUNET_CONSENSUS_JoinMessage, NULL), GNUNET_MQ_handler_end())
Define "main" method using service macro.
Last result set from a gradecast.
#define GNUNET_strdup(a)
Wrapper around GNUNET_xstrdup_.
struct GNUNET_CONTAINER_MultiHashMap * taskmap
size_t pending_prereq
Counter for the prerequisites of this step.
static struct ConsensusSession * sessions_head
Linked list of sessions this peer participates in.
void GNUNET_SET_copy_lazy(struct GNUNET_SET_Handle *set, GNUNET_SET_CopyReadyCallback cb, void *cls)
Definition: set_api.c:1190
Success, all elements have been sent (and received).
struct GNUNET_HashCode session_id
Session id of the consensus.
Definition: consensus.h:53
Internal representation of the hash map.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur...
void GNUNET_SET_element_hash(const struct GNUNET_SET_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition: set_api.c:1242
struct SetKey input_set
#define GNUNET_NETWORK_STRUCT_BEGIN
Define as empty, GNUNET_PACKED should suffice, but this won&#39;t work on W32.
struct Step * prev
All steps of one session are in a linked list for easier deallocation.
const void * data
Actual data of the element.
Sent as context message for set reconciliation.
int * peers_blacklisted
Array of peers with length &#39;num_peers&#39;.
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
static void set_copy_cb(void *cls, struct GNUNET_SET_Handle *copy)
Handle to a client that is connected to a service.
Definition: service.c:249
void GNUNET_SET_destroy(struct GNUNET_SET_Handle *set)
Destroy the set handle, and free all associated resources.
Definition: set_api.c:771
static struct DiffEntry * lookup_diff(struct ConsensusSession *session, struct DiffKey *key)
uint32_t num_peers
Number of peers (at the end of this message) that want to participate in the consensus.
Definition: consensus.h:48
struct ConsensusElement ce
static void construct_task_graph_gradecast(struct ConsensusSession *session, uint16_t rep, uint16_t lead, struct Step *step_before, struct Step *step_after)
Construct the task graph for a single gradecast.
static struct GNUNET_ARM_MonitorHandle * m
Monitor connection with ARM.
Definition: gnunet-arm.c:99
static void handle_client_join(void *cls, const struct GNUNET_CONSENSUS_JoinMessage *m)
Called when a client wants to join a consensus session.
static void finish_step(struct Step *step)
struct GNUNET_PeerIdentity * peers
int GNUNET_asprintf(char **buf, const char *format,...)
Like asprintf, just portable.
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct...
Definition: gnunet_mq_lib.h:52
struct GNUNET_TIME_Absolute conclude_start
Time when the conclusion of the consensus should begin.
static void create_set_copy_for_task(struct TaskEntry *task, struct SetKey *src_set_key, struct SetKey *dst_set_key)
Call the start function of the given task again after we created a copy of the given set...
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
void(* TaskFunc)(struct TaskEntry *task)
int16_t peer2
Number of the second peer in canonical order.
#define GNUNET_array_grow(arr, size, tsize)
Grow a well-typed (!) array.
#define GNUNET_memcpy(dst, src, n)
static struct Step * create_step(struct ConsensusSession *session, int round, int early_finishable)
, &#39; bother checking if a value already exists (faster than GNUNET_CONTAINER_MULTIHASHMAPOPTION_...
void GNUNET_CRYPTO_hash_create_random(enum GNUNET_CRYPTO_Quality mode, struct GNUNET_HashCode *result)
Create a random hash code.
Definition: crypto_hash.c:138
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN.
Definition: consensus.h:42
struct Step ** subordinates
The other peer refused to to the operation with us, or something went wrong.
static int check_client_insert(void *cls, const struct GNUNET_CONSENSUS_ElementMessage *msg)
Called when a client performs an insert operation.
struct GNUNET_CONTAINER_MultiHashMap * changes
static void diff_insert(struct DiffEntry *diff, int weight, const struct GNUNET_SET_Element *element)
static void install_step_timeouts(struct ConsensusSession *session)
struct SetHandle * set_handles_tail
uint64_t lower_bound
Bounded Eppstein lower bound.
static void set_listen_cb(void *cls, const struct GNUNET_PeerIdentity *other_peer, const struct GNUNET_MessageHeader *context_msg, struct GNUNET_SET_Request *request)
Called when another peer wants to do a set operation with the local peer.
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
void GNUNET_CRYPTO_hash(const void *block, size_t size, struct GNUNET_HashCode *ret)
Compute hash of a given block.
Definition: crypto_hash.c:44
static void handle_client_conclude(void *cls, const struct GNUNET_MessageHeader *message)
Called when a client performs the conclude operation.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
struct TaskEntry * task
Task to finish once all changes are through.
GNUNET_SET_Status
Status for the result callback.
Message with an element.
Definition: consensus.h:72
static struct SolverHandle * sh
#define GNUNET_MIN(a, b)
Definition: gnunet_common.h:83
struct DiffKey key
Vote that nothing should change.
const char * GNUNET_STRINGS_relative_time_to_string(struct GNUNET_TIME_Relative delta, int do_round)
Give relative time in human-readable fancy format.
Definition: strings.c:727
struct SetKey dst_set_key
int weight
Positive weight for &#39;add&#39;, negative weights for &#39;remove&#39;.
uint16_t status
See PRISM_STATUS_*-constants.
static char buf[2048]
#define GNUNET_new_array(n, type)
Allocate a size n array with structs or unions of the given type.
static uint16_t task_other_peer(struct TaskEntry *task)
static void cleanup(void *cls)
Function scheduled as very last function, cleans up after us.
static const struct GNUNET_CONFIGURATION_Handle * cfg
Configuration of the consensus service.
p2p message definitions for consensus
struct SetKey input_set
struct GNUNET_MessageHeader header
Type: Either GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT or GNUNET_MESSAGE_TYPE_CONSENSUS_C...
Definition: consensus.h:80
static void client_disconnect_cb(void *cls, struct GNUNET_SERVICE_Client *c, void *internal_cls)
Callback called when a client disconnected from the service.
void GNUNET_STATISTICS_set(struct GNUNET_STATISTICS_Handle *handle, const char *name, uint64_t value, int make_persistent)
Set statistic value for the peer.
Apply a repetition of the all-to-all gradecast to the current set.
A 512-bit hashcode.
Client gets notified of the required changes for both the local and the remote set.
void GNUNET_SERVICE_client_drop(struct GNUNET_SERVICE_Client *c)
Ask the server to disconnect from the given client.
Definition: service.c:2618
struct ConsensusSession * prev
Consensus sessions are kept in a DLL.
char * debug_name
Human-readable name for the task, used for debugging.
Opaque handle to a set.
Definition: set_api.c:49
static const char * setname(uint16_t kind)
int GNUNET_CONFIGURATION_get_value_string(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, char **value)
Get a configuration value that should be a string.
static void task_start_eval_echo(struct TaskEntry *task)
struct TaskKey key
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:35
There must only be one value per key; storing a value should fail if a value under the same key alrea...
static uint16_t rfn_noncontested(struct ReferendumEntry *rfn)
struct TaskEntry ** tasks
Tasks that this step is composed of.
struct GNUNET_TESTBED_Peer * peer
The peer associated with this model.
struct GNUNET_HashCode key
The key used in the DHT.
int early_finishable
When we&#39;re doing an early finish, how should this step be treated? If GNUNET_YES, the step will be ma...
#define GNUNET_SYSERR
Definition: gnunet_common.h:79
static unsigned int size
Size of the "table".
Definition: peer.c:67
#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT
Sent by service when a new element is added.
static void step_depend_on(struct Step *step, struct Step *dep)
Record dep as a dependency of step.
struct GNUNET_CONTAINER_MultiHashMap * diffmap
int is_contested
GNUNET_YES if the set resulted from applying a referendum with contested elements.
static void initialize_session_peer_list(struct ConsensusSession *session, const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
Create the sorted list of peers for the session, add the local peer if not in the join message...
struct GNUNET_HashCode global_id
Global consensus identification, computed from the session id and participating authorities.
#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE
Sent by client to service in order to start the consensus conclusion.
void GNUNET_SET_listen_cancel(struct GNUNET_SET_ListenHandle *lh)
Cancel the given listen operation.
Definition: set_api.c:1047
#define GNUNET_CONTAINER_DLL_insert_tail(head, tail, element)
Insert an element at the tail of a DLL.
struct GNUNET_MQ_Handle * client_mq
Queued messages to the client.
Option for set operations.
int GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
Sent by the client to the service, when the client wants the service to join a consensus session...
Definition: consensus.h:37
struct DiffEntry * diff_create()
static unsigned int num_peers
#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT
Insert an element.
static void put_rfn(struct ConsensusSession *session, struct ReferendumEntry *rfn)
#define GNUNET_memcmp(a, b)
Compare memory in a and b, where both must be of the same pointer type.
unsigned int is_finished
struct DiffKey output_diff
int16_t leader
Leader in the gradecast phase.
Handle to an operation.
Definition: set_api.c:135
Handle to a message queue.
Definition: mq.c:85
A consensus session consists of one local client and the remote authorities.
Tuple of integers that together identify a task uniquely.
static void run_ready_steps(struct ConsensusSession *session)
#define GNUNET_NETWORK_STRUCT_END
Define as empty, GNUNET_PACKED should suffice, but this won&#39;t work on W32;.
unsigned int round
struct GNUNET_HashCode rand
#define GNUNET_MQ_msg_header(type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
Definition: gnunet_mq_lib.h:76
static struct GNUNET_FS_PublishContext * pc
Handle to FS-publishing operation.
The identity of the host (wraps the signing key of the peer).
int GNUNET_SET_commit(struct GNUNET_SET_OperationHandle *oh, struct GNUNET_SET_Handle *set)
Commit a set to be used with a set operation.
Definition: set_api.c:1124
static const char * diffname(uint16_t kind)
uint16_t kind
A value from &#39;enum PhaseKind&#39;.
uint8_t marker
Is this a marker element?
Vote that an element should be added.
static void task_cancel_reconcile(struct TaskEntry *task)
static void set_result_cb(void *cls, const struct GNUNET_SET_Element *element, uint64_t current_size, enum GNUNET_SET_Status status)
Callback for set operation results.
#define GNUNET_PACKED
gcc-ism to get packed structs.
unsigned int finished_tasks
configuration data
Definition: configuration.c:85
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT.
struct GNUNET_SET_Handle * h
int GNUNET_CRYPTO_hash_cmp(const struct GNUNET_HashCode *h1, const struct GNUNET_HashCode *h2)
Compare function for HashCodes, producing a total ordering of all hashcodes.
Definition: crypto_hash.c:278
struct SetHandle * next
uint16_t element_type
Type: GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_NEW_ELEMENT.
Definition: consensus.h:85
Block type for consensus elements.
#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE
Sent by service to client in order to signal a completed consensus conclusion.
int16_t leader
Leader in the gradecast phase.
uint16_t size
Number of bytes in the buffer pointed to by data.
struct Step * step
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
const struct GNUNET_SET_Element * element
#define GNUNET_log(kind,...)
Entry in list of pending tasks.
Definition: scheduler.c:134
unsigned int local_peer_idx
Index of the local peer in the peers array.
unsigned int is_running
#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN
Join a consensus session.
Opaque handle to a listen operation.
Definition: set_api.c:186
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
enum ReferendumVote proposal
Proposal for this element, can only be VOTE_ADD or VOTE_REMOVE.
int GNUNET_CRYPTO_get_peer_identity(const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_PeerIdentity *dst)
Retrieve the identity of the host&#39;s peer.
struct SetOpCls setop
unsigned int num_client_insert_pending
static const char * rfnname(uint16_t kind)
static void run(void *cls, const struct GNUNET_CONFIGURATION_Handle *c, struct GNUNET_SERVICE_Handle *service)
Start processing consensus requests.
struct GNUNET_TIME_Absolute conclude_deadline
Timeout for all rounds together, single rounds will schedule a timeout task with a fraction of the co...
void GNUNET_SET_operation_cancel(struct GNUNET_SET_OperationHandle *oh)
Cancel the given set operation.
Definition: set_api.c:515
static void handle_client_insert(void *cls, const struct GNUNET_CONSENSUS_ElementMessage *msg)
Called when a client performs an insert operation.
static int send_to_client_iter(void *cls, const struct GNUNET_SET_Element *element)
Send the final result set of the consensus to the client, element by element.
struct GNUNET_SET_Element * GNUNET_SET_element_dup(const struct GNUNET_SET_Element *element)
Create a copy of an element.
Definition: set_api.c:1219
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
struct GNUNET_SET_OperationHandle * op
Header for all communications.
struct GNUNET_SERVICE_Client * client
Client that inhabits the session.
Time for absolute times used by GNUnet, in microseconds.
#define GNUNET_YES
Definition: gnunet_common.h:80
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_difference(struct GNUNET_TIME_Absolute start, struct GNUNET_TIME_Absolute end)
Compute the time difference between the given start and end times.
Definition: time.c:353
static void rfn_commit(struct ReferendumEntry *rfn, uint16_t commit_peer)
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:353
struct RfnKey output_rfn
static void set_mutation_done(void *cls)
union TaskFuncCls cls
struct SetHandle * prev
static void rfn_majority(const struct ReferendumEntry *rfn, const struct RfnElementInfo *ri, uint16_t *ret_majority, enum ReferendumVote *ret_vote)
For a given majority, count what the outcome is (add/remove/keep), and give the number of peers that ...
unsigned int tasks_cap
int16_t peer1
Number of the first peer in canonical order.
struct ConsensusElement ce
static void start_task(struct ConsensusSession *session, struct TaskEntry *task)
Set union, return all elements that are in at least one of the sets.
int GNUNET_SET_iterate(struct GNUNET_SET_Handle *set, GNUNET_SET_ElementIterator iter, void *iter_cls)
Iterate over all elements in the given set.
Definition: set_api.c:1168
unsigned int subordinates_len
struct SetKey output_set
static void task_start_apply_round(struct TaskEntry *task)
Apply the result from one round of gradecasts (i.e.
static void put_set(struct ConsensusSession *session, struct SetEntry *set)
struct GNUNET_CONTAINER_MultiHashMap * rfn_elements
int GNUNET_CRYPTO_kdf(void *result, size_t out_len, const void *xts, size_t xts_len, const void *skm, size_t skm_len,...)
Derive key.
Definition: crypto_kdf.c:91
void GNUNET_SERVICE_client_continue(struct GNUNET_SERVICE_Client *c)
Continue receiving further messages from the given client.
Definition: service.c:2533
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
struct GNUNET_TIME_AbsoluteNBO deadline
Deadline for conclude.
Definition: consensus.h:63
int early_stopping
State of our early stopping scheme.
No good quality of the operation is needed (i.e., random numbers can be pseudo-random).
static void rfn_vote(struct ReferendumEntry *rfn, uint16_t voting_peer, enum ReferendumVote vote, const struct GNUNET_SET_Element *element)
static void put_task(struct GNUNET_CONTAINER_MultiHashMap *taskmap, struct TaskEntry *t)
#define GNUNET_malloc(size)
Wrapper around malloc.
int16_t peer1
Number of the first peer in canonical order.
struct SetKey key
int16_t peer2
Number of the second peer in canonical order.
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:48
struct TaskEntry * task
#define GNUNET_free(ptr)
Wrapper around free.
uint16_t element_type
Application-specific element type.
int16_t repetition
Repetition of the gradecast phase.
struct GNUNET_SET_OperationHandle * GNUNET_SET_accept(struct GNUNET_SET_Request *request, enum GNUNET_SET_ResultMode result_mode, struct GNUNET_SET_Option options[], GNUNET_SET_ResultIterator result_cb, void *result_cls)
Accept a request we got via GNUNET_SET_listen().
Definition: set_api.c:1081
struct GNUNET_SCHEDULER_Task * timeout_task
Fail set operations when the other peer shows weird behavior that might by a Byzantine fault...
uint16_t kind
A value from &#39;enum PhaseKind&#39;.