GNUnet  0.10.x
gnunet-service-consensus.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2012, 2013, 2017 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 #include "gnunet_block_lib.h"
30 #include "gnunet_protocols.h"
31 #include "gnunet_applications.h"
32 #include "gnunet_set_service.h"
35 #include "consensus_protocol.h"
36 #include "consensus.h"
37 
38 
44  VOTE_STAY = 0,
48  VOTE_ADD = 1,
53 };
54 
55 
60 };
61 
62 
64 
69 struct TaskKey {
73  uint16_t kind GNUNET_PACKED;
74 
80 
85 
90 
97 };
98 
99 
100 
101 struct SetKey {
102  int set_kind GNUNET_PACKED;
105 };
106 
107 
108 struct SetEntry {
109  struct SetKey key;
117 };
118 
119 
120 struct DiffKey {
121  int diff_kind GNUNET_PACKED;
124 };
125 
126 struct RfnKey {
127  int rfn_kind GNUNET_PACKED;
130 };
131 
132 
134 
135 enum PhaseKind {
149 };
150 
151 
152 enum SetKind {
161 };
162 
163 enum DiffKind {
168 };
169 
170 enum RfnKind {
175 };
176 
177 
178 struct SetOpCls {
179  struct SetKey input_set;
180 
181  struct SetKey output_set;
182  struct RfnKey output_rfn;
183  struct DiffKey output_diff;
184 
186 
188 
190 };
191 
192 
193 struct FinishCls {
194  struct SetKey input_set;
195 };
196 
201 union TaskFuncCls {
202  struct SetOpCls setop;
203  struct FinishCls finish;
204 };
205 
206 struct TaskEntry;
207 
208 typedef void (*TaskFunc) (struct TaskEntry *task);
209 
210 /*
211  * Node in the consensus task graph.
212  */
213 struct TaskEntry {
214  struct TaskKey key;
215 
216  struct Step *step;
217 
219 
221 
224 
225  union TaskFuncCls cls;
226 };
227 
228 
229 struct Step {
234  struct Step *prev;
235 
240  struct Step *next;
241 
243 
247  struct TaskEntry **tasks;
248  unsigned int tasks_len;
249  unsigned int tasks_cap;
250 
251  unsigned int finished_tasks;
252 
253  /*
254  * Tasks that have this task as dependency.
255  *
256  * We store pointers to subordinates rather
257  * than to prerequisites since it makes
258  * tracking the readiness of a task easier.
259  */
260  struct Step **subordinates;
261  unsigned int subordinates_len;
262  unsigned int subordinates_cap;
263 
269 
270  /*
271  * Task that will run this step despite
272  * any pending prerequisites.
273  */
275 
276  unsigned int is_running;
277 
278  unsigned int is_finished;
279 
280  /*
281  * Synchrony round of the task.
282  * Determines the deadline for the task.
283  */
284  unsigned int round;
285 
290  char *debug_name;
291 
304 };
305 
306 
309 
310  /*
311  * GNUNET_YES if the peer votes for the proposal.
312  */
313  int *votes;
314 
319  enum ReferendumVote proposal;
320 };
321 
322 
324  struct RfnKey key;
325 
326  /*
327  * Elements where there is at least one proposed change.
328  *
329  * Maps the hash of the GNUNET_SET_Element
330  * to 'struct RfnElementInfo'.
331  */
333 
334  unsigned int num_peers;
335 
347 
348 
355 };
356 
357 
360 
365  int weight;
366 };
367 
368 
372 struct DiffEntry {
373  struct DiffKey key;
375 };
376 
377 struct SetHandle {
378  struct SetHandle *prev;
379  struct SetHandle *next;
380 
382 };
383 
384 
385 
394 
399 
401 
405 
410 
411  /*
412  * Mapping from (hashed) TaskKey to TaskEntry.
413  *
414  * We map the application_id for a round to the task that should be
415  * executed, so we don't have to go through all task whenever we get
416  * an incoming set op request.
417  */
419 
420  struct Step *steps_head;
421  struct Step *steps_tail;
422 
424 
426 
431  struct GNUNET_HashCode global_id;
432 
437 
442 
446  struct GNUNET_TIME_Absolute conclude_start;
447 
453  struct GNUNET_TIME_Absolute conclude_deadline;
454 
456 
460  unsigned int num_peers;
461 
465  unsigned int local_peer_idx;
466 
472 
477 
481  uint64_t first_size;
482 
484 
488  uint64_t lower_bound;
489 
492 };
493 
498 
503 
507 static const struct GNUNET_CONFIGURATION_Handle *cfg;
508 
513 
518 
519 
520 static void
521 finish_task(struct TaskEntry *task);
522 
523 
524 static void
525 run_ready_steps(struct ConsensusSession *session);
526 
527 
528 static const char *
529 phasename(uint16_t phase)
530 {
531  switch (phase)
532  {
533  case PHASE_KIND_ALL_TO_ALL: return "ALL_TO_ALL";
534 
535  case PHASE_KIND_ALL_TO_ALL_2: return "ALL_TO_ALL_2";
536 
537  case PHASE_KIND_FINISH: return "FINISH";
538 
539  case PHASE_KIND_GRADECAST_LEADER: return "GRADECAST_LEADER";
540 
541  case PHASE_KIND_GRADECAST_ECHO: return "GRADECAST_ECHO";
542 
543  case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
544 
545  case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
546 
547  case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
548 
549  case PHASE_KIND_APPLY_REP: return "APPLY_REP";
550 
551  default: return "(unknown)";
552  }
553 }
554 
555 
556 static const char *
557 setname(uint16_t kind)
558 {
559  switch (kind)
560  {
561  case SET_KIND_CURRENT: return "CURRENT";
562 
563  case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
564 
565  case SET_KIND_NONE: return "NONE";
566 
567  default: return "(unknown)";
568  }
569 }
570 
571 static const char *
572 rfnname(uint16_t kind)
573 {
574  switch (kind)
575  {
576  case RFN_KIND_NONE: return "NONE";
577 
578  case RFN_KIND_ECHO: return "ECHO";
579 
580  case RFN_KIND_CONFIRM: return "CONFIRM";
581 
582  default: return "(unknown)";
583  }
584 }
585 
586 static const char *
587 diffname(uint16_t kind)
588 {
589  switch (kind)
590  {
591  case DIFF_KIND_NONE: return "NONE";
592 
593  case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
594 
595  case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
596 
597  case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
598 
599  default: return "(unknown)";
600  }
601 }
602 
603 #ifdef GNUNET_EXTRA_LOGGING
604 
605 
606 static const char *
607 debug_str_element(const struct GNUNET_SET_Element *el)
608 {
609  struct GNUNET_HashCode hash;
610 
611  GNUNET_SET_element_hash(el, &hash);
612 
613  return GNUNET_h2s(&hash);
614 }
615 
616 static const char *
617 debug_str_task_key(struct TaskKey *tk)
618 {
619  static char buf[256];
620 
621  snprintf(buf, sizeof(buf),
622  "TaskKey kind=%s, p1=%d, p2=%d, l=%d, rep=%d",
623  phasename(tk->kind), tk->peer1, tk->peer2,
624  tk->leader, tk->repetition);
625 
626  return buf;
627 }
628 
629 static const char *
630 debug_str_diff_key(struct DiffKey *dk)
631 {
632  static char buf[256];
633 
634  snprintf(buf, sizeof(buf),
635  "DiffKey kind=%s, k1=%d, k2=%d",
636  diffname(dk->diff_kind), dk->k1, dk->k2);
637 
638  return buf;
639 }
640 
641 static const char *
642 debug_str_set_key(const struct SetKey *sk)
643 {
644  static char buf[256];
645 
646  snprintf(buf, sizeof(buf),
647  "SetKey kind=%s, k1=%d, k2=%d",
648  setname(sk->set_kind), sk->k1, sk->k2);
649 
650  return buf;
651 }
652 
653 
654 static const char *
655 debug_str_rfn_key(const struct RfnKey *rk)
656 {
657  static char buf[256];
658 
659  snprintf(buf, sizeof(buf),
660  "RfnKey kind=%s, k1=%d, k2=%d",
661  rfnname(rk->rfn_kind), rk->k1, rk->k2);
662 
663  return buf;
664 }
665 
666 #endif /* GNUNET_EXTRA_LOGGING */
667 
668 
678 static int
680  const struct GNUNET_SET_Element *element)
681 {
682  struct TaskEntry *task = (struct TaskEntry *)cls;
683  struct ConsensusSession *session = task->step->session;
684  struct GNUNET_MQ_Envelope *ev;
685 
686  if (NULL != element)
687  {
689  const struct ConsensusElement *ce;
690 
692  ce = element->data;
693 
694  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "marker is %u\n", (unsigned)ce->marker);
695 
696  if (0 != ce->marker)
697  return GNUNET_YES;
698 
700  "P%d: sending element %s to client\n",
701  session->local_peer_idx,
702  debug_str_element(element));
703 
704  ev = GNUNET_MQ_msg_extra(m, element->size - sizeof(struct ConsensusElement),
706  m->element_type = ce->payload_type;
707  GNUNET_memcpy(&m[1], &ce[1], element->size - sizeof(struct ConsensusElement));
708  GNUNET_MQ_send(session->client_mq, ev);
709  }
710  else
711  {
713  "P%d: finished iterating elements for client\n",
714  session->local_peer_idx);
716  GNUNET_MQ_send(session->client_mq, ev);
717  }
718  return GNUNET_YES;
719 }
720 
721 
722 static struct SetEntry *
723 lookup_set(struct ConsensusSession *session, struct SetKey *key)
724 {
725  struct GNUNET_HashCode hash;
726 
728  "P%u: looking up set {%s}\n",
729  session->local_peer_idx,
730  debug_str_set_key(key));
731 
733  GNUNET_CRYPTO_hash(key, sizeof(struct SetKey), &hash);
734  return GNUNET_CONTAINER_multihashmap_get(session->setmap, &hash);
735 }
736 
737 
738 static struct DiffEntry *
739 lookup_diff(struct ConsensusSession *session, struct DiffKey *key)
740 {
741  struct GNUNET_HashCode hash;
742 
744  "P%u: looking up diff {%s}\n",
745  session->local_peer_idx,
746  debug_str_diff_key(key));
747 
749  GNUNET_CRYPTO_hash(key, sizeof(struct DiffKey), &hash);
750  return GNUNET_CONTAINER_multihashmap_get(session->diffmap, &hash);
751 }
752 
753 
754 static struct ReferendumEntry *
755 lookup_rfn(struct ConsensusSession *session, struct RfnKey *key)
756 {
757  struct GNUNET_HashCode hash;
758 
760  "P%u: looking up rfn {%s}\n",
761  session->local_peer_idx,
762  debug_str_rfn_key(key));
763 
765  GNUNET_CRYPTO_hash(key, sizeof(struct RfnKey), &hash);
766  return GNUNET_CONTAINER_multihashmap_get(session->rfnmap, &hash);
767 }
768 
769 
770 static void
771 diff_insert(struct DiffEntry *diff,
772  int weight,
773  const struct GNUNET_SET_Element *element)
774 {
775  struct DiffElementInfo *di;
776  struct GNUNET_HashCode hash;
777 
778  GNUNET_assert((1 == weight) || (-1 == weight));
779 
781  "diff_insert with element size %u\n",
782  element->size);
783 
785  "hashing element\n");
786 
787  GNUNET_SET_element_hash(element, &hash);
788 
790  "hashed element\n");
791 
792  di = GNUNET_CONTAINER_multihashmap_get(diff->changes, &hash);
793 
794  if (NULL == di)
795  {
796  di = GNUNET_new(struct DiffElementInfo);
797  di->element = GNUNET_SET_element_dup(element);
800  &hash, di,
802  }
803 
804  di->weight = weight;
805 }
806 
807 
808 static void
810  uint16_t commit_peer)
811 {
812  GNUNET_assert(commit_peer < rfn->num_peers);
813 
814  rfn->peer_commited[commit_peer] = GNUNET_YES;
815 }
816 
817 
818 static void
820  uint16_t contested_peer)
821 {
822  GNUNET_assert(contested_peer < rfn->num_peers);
823 
824  rfn->peer_contested[contested_peer] = GNUNET_YES;
825 }
826 
827 
828 static uint16_t
830 {
831  uint16_t i;
832  uint16_t ret;
833 
834  ret = 0;
835  for (i = 0; i < rfn->num_peers; i++)
836  if ((GNUNET_YES == rfn->peer_commited[i]) && (GNUNET_NO == rfn->peer_contested[i]))
837  ret++;
838 
839  return ret;
840 }
841 
842 
843 static void
845  uint16_t voting_peer,
846  enum ReferendumVote vote,
847  const struct GNUNET_SET_Element *element)
848 {
849  struct RfnElementInfo *ri;
850  struct GNUNET_HashCode hash;
851 
852  GNUNET_assert(voting_peer < rfn->num_peers);
853 
854  /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
855  since VOTE_KEEP is implicit in not voting. */
856  GNUNET_assert((VOTE_ADD == vote) || (VOTE_REMOVE == vote));
857 
858  GNUNET_SET_element_hash(element, &hash);
860 
861  if (NULL == ri)
862  {
863  ri = GNUNET_new(struct RfnElementInfo);
864  ri->element = GNUNET_SET_element_dup(element);
865  ri->votes = GNUNET_new_array(rfn->num_peers, int);
868  &hash, ri,
870  }
871 
872  ri->votes[voting_peer] = GNUNET_YES;
873  ri->proposal = vote;
874 }
875 
876 
877 static uint16_t
879 {
880  uint16_t me = task->step->session->local_peer_idx;
881 
882  if (task->key.peer1 == me)
883  return task->key.peer2;
884  return task->key.peer1;
885 }
886 
887 
888 static int
889 cmp_uint64_t(const void *pa, const void *pb)
890 {
891  uint64_t a = *(uint64_t *)pa;
892  uint64_t b = *(uint64_t *)pb;
893 
894  if (a == b)
895  return 0;
896  if (a < b)
897  return -1;
898  return 1;
899 }
900 
901 
911 static void
912 set_result_cb(void *cls,
913  const struct GNUNET_SET_Element *element,
914  uint64_t current_size,
916 {
917  struct TaskEntry *task = cls;
918  struct ConsensusSession *session = task->step->session;
919  struct SetEntry *output_set = NULL;
920  struct DiffEntry *output_diff = NULL;
921  struct ReferendumEntry *output_rfn = NULL;
922  unsigned int other_idx;
923  struct SetOpCls *setop;
924  const struct ConsensusElement *consensus_element = NULL;
925 
926  if (NULL != element)
927  {
929  "P%u: got element of type %u, status %u\n",
930  session->local_peer_idx,
931  (unsigned)element->element_type,
932  (unsigned)status);
934  consensus_element = element->data;
935  }
936 
937  setop = &task->cls.setop;
938 
939 
941  "P%u: got set result for {%s}, status %u\n",
942  session->local_peer_idx,
943  debug_str_task_key(&task->key),
944  status);
945 
946  if (GNUNET_NO == task->is_started)
947  {
948  GNUNET_break_op(0);
949  return;
950  }
951 
952  if (GNUNET_YES == task->is_finished)
953  {
954  GNUNET_break_op(0);
955  return;
956  }
957 
958  other_idx = task_other_peer(task);
959 
960  if (SET_KIND_NONE != setop->output_set.set_kind)
961  {
962  output_set = lookup_set(session, &setop->output_set);
963  GNUNET_assert(NULL != output_set);
964  }
965 
966  if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
967  {
968  output_diff = lookup_diff(session, &setop->output_diff);
969  GNUNET_assert(NULL != output_diff);
970  }
971 
972  if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
973  {
974  output_rfn = lookup_rfn(session, &setop->output_rfn);
975  GNUNET_assert(NULL != output_rfn);
976  }
977 
978  if (GNUNET_YES == session->peers_blacklisted[other_idx])
979  {
980  /* Peer might have been blacklisted
981  by a gradecast running in parallel, ignore elements from now */
982  if (GNUNET_SET_STATUS_ADD_LOCAL == status)
983  return;
984  if (GNUNET_SET_STATUS_ADD_REMOTE == status)
985  return;
986  }
987 
988  if ((NULL != consensus_element) && (0 != consensus_element->marker))
989  {
991  "P%u: got some marker\n",
992  session->local_peer_idx);
993  if ((GNUNET_YES == setop->transceive_contested) &&
994  (CONSENSUS_MARKER_CONTESTED == consensus_element->marker))
995  {
996  GNUNET_assert(NULL != output_rfn);
997  rfn_contest(output_rfn, task_other_peer(task));
998  return;
999  }
1000 
1001  if (CONSENSUS_MARKER_SIZE == consensus_element->marker)
1002  {
1004  "P%u: got size marker\n",
1005  session->local_peer_idx);
1006 
1007 
1008  struct ConsensusSizeElement *cse = (void *)consensus_element;
1009 
1010  if (cse->sender_index == other_idx)
1011  {
1012  if (NULL == session->first_sizes_received)
1013  session->first_sizes_received = GNUNET_new_array(session->num_peers, uint64_t);
1014  session->first_sizes_received[other_idx] = GNUNET_ntohll(cse->size);
1015 
1016  uint64_t *copy = GNUNET_memdup(session->first_sizes_received, sizeof(uint64_t) * session->num_peers);
1017  qsort(copy, session->num_peers, sizeof(uint64_t), cmp_uint64_t);
1018  session->lower_bound = copy[session->num_peers / 3 + 1];
1020  "P%u: lower bound %llu\n",
1021  session->local_peer_idx,
1022  (long long)session->lower_bound);
1023  GNUNET_free(copy);
1024  }
1025  return;
1026  }
1027 
1028  return;
1029  }
1030 
1031  switch (status)
1032  {
1034  GNUNET_assert(NULL != consensus_element);
1036  "Adding element in Task {%s}\n",
1037  debug_str_task_key(&task->key));
1038  if (NULL != output_set)
1039  {
1040  // FIXME: record pending adds, use callback
1041  GNUNET_SET_add_element(output_set->h,
1042  element,
1043  NULL,
1044  NULL);
1045 #ifdef GNUNET_EXTRA_LOGGING
1047  "P%u: adding element %s into set {%s} of task {%s}\n",
1048  session->local_peer_idx,
1049  debug_str_element(element),
1050  debug_str_set_key(&setop->output_set),
1051  debug_str_task_key(&task->key));
1052 #endif
1053  }
1054  if (NULL != output_diff)
1055  {
1056  diff_insert(output_diff, 1, element);
1057 #ifdef GNUNET_EXTRA_LOGGING
1059  "P%u: adding element %s into diff {%s} of task {%s}\n",
1060  session->local_peer_idx,
1061  debug_str_element(element),
1062  debug_str_diff_key(&setop->output_diff),
1063  debug_str_task_key(&task->key));
1064 #endif
1065  }
1066  if (NULL != output_rfn)
1067  {
1068  rfn_vote(output_rfn, task_other_peer(task), VOTE_ADD, element);
1069 #ifdef GNUNET_EXTRA_LOGGING
1071  "P%u: adding element %s into rfn {%s} of task {%s}\n",
1072  session->local_peer_idx,
1073  debug_str_element(element),
1074  debug_str_rfn_key(&setop->output_rfn),
1075  debug_str_task_key(&task->key));
1076 #endif
1077  }
1078  // XXX: add result to structures in task
1079  break;
1080 
1082  GNUNET_assert(NULL != consensus_element);
1083  if (GNUNET_YES == setop->do_not_remove)
1084  break;
1085  if (CONSENSUS_MARKER_CONTESTED == consensus_element->marker)
1086  break;
1088  "Removing element in Task {%s}\n",
1089  debug_str_task_key(&task->key));
1090  if (NULL != output_set)
1091  {
1092  // FIXME: record pending adds, use callback
1093  GNUNET_SET_remove_element(output_set->h,
1094  element,
1095  NULL,
1096  NULL);
1097 #ifdef GNUNET_EXTRA_LOGGING
1099  "P%u: removing element %s from set {%s} of task {%s}\n",
1100  session->local_peer_idx,
1101  debug_str_element(element),
1102  debug_str_set_key(&setop->output_set),
1103  debug_str_task_key(&task->key));
1104 #endif
1105  }
1106  if (NULL != output_diff)
1107  {
1108  diff_insert(output_diff, -1, element);
1109 #ifdef GNUNET_EXTRA_LOGGING
1111  "P%u: removing element %s from diff {%s} of task {%s}\n",
1112  session->local_peer_idx,
1113  debug_str_element(element),
1114  debug_str_diff_key(&setop->output_diff),
1115  debug_str_task_key(&task->key));
1116 #endif
1117  }
1118  if (NULL != output_rfn)
1119  {
1120  rfn_vote(output_rfn, task_other_peer(task), VOTE_REMOVE, element);
1121 #ifdef GNUNET_EXTRA_LOGGING
1123  "P%u: removing element %s from rfn {%s} of task {%s}\n",
1124  session->local_peer_idx,
1125  debug_str_element(element),
1126  debug_str_rfn_key(&setop->output_rfn),
1127  debug_str_task_key(&task->key));
1128 #endif
1129  }
1130  break;
1131 
1133  // XXX: check first if any changes to the underlying
1134  // set are still pending
1136  "P%u: Finishing setop in Task {%s} (%u/%u)\n",
1137  session->local_peer_idx,
1138  debug_str_task_key(&task->key),
1139  (unsigned int)task->step->finished_tasks,
1140  (unsigned int)task->step->tasks_len);
1141  if (NULL != output_rfn)
1142  {
1143  rfn_commit(output_rfn, task_other_peer(task));
1144  }
1145  if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1146  {
1147  session->first_size = current_size;
1148  }
1149  finish_task(task);
1150  break;
1151 
1153  // XXX: cleanup
1154  GNUNET_break_op(0);
1155  finish_task(task);
1156  return;
1157 
1158  default:
1159  /* not reached */
1160  GNUNET_assert(0);
1161  }
1162 }
1163 
1164 #ifdef EVIL
1165 
1166 enum EvilnessType {
1167  EVILNESS_NONE,
1168  EVILNESS_CRAM_ALL,
1169  EVILNESS_CRAM_LEAD,
1170  EVILNESS_CRAM_ECHO,
1171  EVILNESS_SLACK,
1172  EVILNESS_SLACK_A2A,
1173 };
1174 
1175 enum EvilnessSubType {
1176  EVILNESS_SUB_NONE,
1177  EVILNESS_SUB_REPLACEMENT,
1178  EVILNESS_SUB_NO_REPLACEMENT,
1179 };
1180 
1181 struct Evilness {
1182  enum EvilnessType type;
1183  enum EvilnessSubType subtype;
1184  unsigned int num;
1185 };
1186 
1187 
1188 static int
1189 parse_evilness_cram_subtype(const char *evil_subtype_str, struct Evilness *evil)
1190 {
1191  if (0 == strcmp("replace", evil_subtype_str))
1192  {
1193  evil->subtype = EVILNESS_SUB_REPLACEMENT;
1194  }
1195  else if (0 == strcmp("noreplace", evil_subtype_str))
1196  {
1197  evil->subtype = EVILNESS_SUB_NO_REPLACEMENT;
1198  }
1199  else
1200  {
1202  "Malformed field '%s' in EVIL_SPEC (unknown subtype), behaving like a good peer.\n",
1203  evil_subtype_str);
1204  return GNUNET_SYSERR;
1205  }
1206  return GNUNET_OK;
1207 }
1208 
1209 
1210 static void
1211 get_evilness(struct ConsensusSession *session, struct Evilness *evil)
1212 {
1213  char *evil_spec;
1214  char *field;
1215  char *evil_type_str = NULL;
1216  char *evil_subtype_str = NULL;
1217 
1218  GNUNET_assert(NULL != evil);
1219 
1220  if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string(cfg, "consensus", "EVIL_SPEC", &evil_spec))
1221  {
1223  "P%u: no evilness\n",
1224  session->local_peer_idx);
1225  evil->type = EVILNESS_NONE;
1226  return;
1227  }
1229  "P%u: got evilness spec\n",
1230  session->local_peer_idx);
1231 
1232  for (field = strtok(evil_spec, "/");
1233  NULL != field;
1234  field = strtok(NULL, "/"))
1235  {
1236  unsigned int peer_num;
1237  unsigned int evil_num;
1238  int ret;
1239 
1240  evil_type_str = NULL;
1241  evil_subtype_str = NULL;
1242 
1243  ret = sscanf(field, "%u;%m[a-z-];%m[a-z-];%u", &peer_num, &evil_type_str, &evil_subtype_str, &evil_num);
1244 
1245  if (ret != 4)
1246  {
1248  "Malformed field '%s' in EVIL_SPEC (expected 4 components got %d), behaving like a good peer.\n",
1249  field,
1250  ret);
1251  goto not_evil;
1252  }
1253 
1254  GNUNET_assert(NULL != evil_type_str);
1255  GNUNET_assert(NULL != evil_subtype_str);
1256 
1257  if (peer_num == session->local_peer_idx)
1258  {
1259  if (0 == strcmp("slack", evil_type_str))
1260  {
1261  evil->type = EVILNESS_SLACK;
1262  }
1263  if (0 == strcmp("slack-a2a", evil_type_str))
1264  {
1265  evil->type = EVILNESS_SLACK_A2A;
1266  }
1267  else if (0 == strcmp("cram-all", evil_type_str))
1268  {
1269  evil->type = EVILNESS_CRAM_ALL;
1270  evil->num = evil_num;
1271  if (GNUNET_OK != parse_evilness_cram_subtype(evil_subtype_str, evil))
1272  goto not_evil;
1273  }
1274  else if (0 == strcmp("cram-lead", evil_type_str))
1275  {
1276  evil->type = EVILNESS_CRAM_LEAD;
1277  evil->num = evil_num;
1278  if (GNUNET_OK != parse_evilness_cram_subtype(evil_subtype_str, evil))
1279  goto not_evil;
1280  }
1281  else if (0 == strcmp("cram-echo", evil_type_str))
1282  {
1283  evil->type = EVILNESS_CRAM_ECHO;
1284  evil->num = evil_num;
1285  if (GNUNET_OK != parse_evilness_cram_subtype(evil_subtype_str, evil))
1286  goto not_evil;
1287  }
1288  else
1289  {
1291  "Malformed field '%s' in EVIL_SPEC (unknown type), behaving like a good peer.\n",
1292  evil_type_str);
1293  goto not_evil;
1294  }
1295  goto cleanup;
1296  }
1297  /* No GNUNET_free since memory was allocated by libc */
1298  free(evil_type_str);
1299  evil_type_str = NULL;
1300  evil_subtype_str = NULL;
1301  }
1302 not_evil:
1303  evil->type = EVILNESS_NONE;
1304 cleanup:
1305  GNUNET_free(evil_spec);
1306  /* no GNUNET_free_non_null since it wasn't
1307  * allocated with GNUNET_malloc */
1308  if (NULL != evil_type_str)
1309  free(evil_type_str);
1310  if (NULL != evil_subtype_str)
1311  free(evil_subtype_str);
1312 }
1313 
1314 #endif
1315 
1316 
1321 static void
1323  struct TaskEntry *task)
1324 {
1325  struct SetEntry *set;
1326  struct SetOpCls *setop = &task->cls.setop;
1327 
1328  GNUNET_assert(NULL != setop->op);
1329  set = lookup_set(session, &setop->input_set);
1330  GNUNET_assert(NULL != set);
1331 
1332  if ((GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == set->is_contested))
1333  {
1334  struct GNUNET_SET_Element element;
1335  struct ConsensusElement ce = { 0 };
1337  element.data = &ce;
1338  element.size = sizeof(struct ConsensusElement);
1340  GNUNET_SET_add_element(set->h, &element, NULL, NULL);
1341  }
1342 
1343  if (PHASE_KIND_ALL_TO_ALL_2 == task->key.kind)
1344  {
1345  struct GNUNET_SET_Element element;
1346  struct ConsensusSizeElement cse = {
1347  .size = 0,
1348  .sender_index = 0
1349  };
1350  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "inserting size marker\n");
1352  cse.size = GNUNET_htonll(session->first_size);
1353  cse.sender_index = session->local_peer_idx;
1354  element.data = &cse;
1355  element.size = sizeof(struct ConsensusSizeElement);
1357  GNUNET_SET_add_element(set->h, &element, NULL, NULL);
1358  }
1359 
1360 #ifdef EVIL
1361  {
1362  unsigned int i;
1363  struct Evilness evil;
1364 
1365  get_evilness(session, &evil);
1366  if (EVILNESS_NONE != evil.type)
1367  {
1368  /* Useful for evaluation */
1369  GNUNET_STATISTICS_set(statistics,
1370  "is evil",
1371  1,
1372  GNUNET_NO);
1373  }
1374  switch (evil.type)
1375  {
1376  case EVILNESS_CRAM_ALL:
1377  case EVILNESS_CRAM_LEAD:
1378  case EVILNESS_CRAM_ECHO:
1379  /* We're not cramming elements in the
1380  all-to-all round, since that would just
1381  add more elements to the result set, but
1382  wouldn't test robustness. */
1383  if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
1384  {
1385  GNUNET_SET_commit(setop->op, set->h);
1386  break;
1387  }
1388  if ((EVILNESS_CRAM_LEAD == evil.type) &&
1389  ((PHASE_KIND_GRADECAST_LEADER != task->key.kind) || SET_KIND_CURRENT != set->key.set_kind))
1390  {
1391  GNUNET_SET_commit(setop->op, set->h);
1392  break;
1393  }
1394  if (EVILNESS_CRAM_ECHO == evil.type && (PHASE_KIND_GRADECAST_ECHO != task->key.kind))
1395  {
1396  GNUNET_SET_commit(setop->op, set->h);
1397  break;
1398  }
1399  for (i = 0; i < evil.num; i++)
1400  {
1401  struct GNUNET_SET_Element element;
1402  struct ConsensusStuffedElement se = {
1403  .ce.payload_type = 0,
1404  .ce.marker = 0,
1405  };
1406  element.data = &se;
1407  element.size = sizeof(struct ConsensusStuffedElement);
1409 
1410  if (EVILNESS_SUB_REPLACEMENT == evil.subtype)
1411  {
1412  /* Always generate a new element. */
1414  }
1415  else if (EVILNESS_SUB_NO_REPLACEMENT == evil.subtype)
1416  {
1417  /* Always cram the same elements, derived from counter. */
1418  GNUNET_CRYPTO_hash(&i, sizeof(i), &se.rand);
1419  }
1420  else
1421  {
1422  GNUNET_assert(0);
1423  }
1424  GNUNET_SET_add_element(set->h, &element, NULL, NULL);
1425 #ifdef GNUNET_EXTRA_LOGGING
1427  "P%u: evil peer: cramming element %s into set {%s} of task {%s}\n",
1428  session->local_peer_idx,
1429  debug_str_element(&element),
1430  debug_str_set_key(&setop->input_set),
1431  debug_str_task_key(&task->key));
1432 #endif
1433  }
1434  GNUNET_STATISTICS_update(statistics,
1435  "# stuffed elements",
1436  evil.num,
1437  GNUNET_NO);
1438  GNUNET_SET_commit(setop->op, set->h);
1439  break;
1440 
1441  case EVILNESS_SLACK:
1443  "P%u: evil peer: slacking\n",
1444  (unsigned int)session->local_peer_idx);
1445 
1446  /* Do nothing. */
1447  case EVILNESS_SLACK_A2A:
1448  if ((PHASE_KIND_ALL_TO_ALL_2 == task->key.kind) ||
1449  (PHASE_KIND_ALL_TO_ALL == task->key.kind))
1450  {
1451  struct GNUNET_SET_Handle *empty_set;
1453  GNUNET_SET_commit(setop->op, empty_set);
1454  GNUNET_SET_destroy(empty_set);
1455  }
1456  else
1457  {
1458  GNUNET_SET_commit(setop->op, set->h);
1459  }
1460  break;
1461 
1462  case EVILNESS_NONE:
1463  GNUNET_SET_commit(setop->op, set->h);
1464  break;
1465  }
1466  }
1467 #else
1468  if (GNUNET_NO == session->peers_blacklisted[task_other_peer(task)])
1469  {
1470  GNUNET_SET_commit(setop->op, set->h);
1471  }
1472  else
1473  {
1474  /* For our testcases, we don't want the blacklisted
1475  peers to wait. */
1477  setop->op = NULL;
1478  finish_task(task);
1479  }
1480 #endif
1481 }
1482 
1483 
1484 static void
1485 put_diff(struct ConsensusSession *session,
1486  struct DiffEntry *diff)
1487 {
1488  struct GNUNET_HashCode hash;
1489 
1490  GNUNET_assert(NULL != diff);
1491 
1492  GNUNET_CRYPTO_hash(&diff->key, sizeof(struct DiffKey), &hash);
1494  GNUNET_CONTAINER_multihashmap_put(session->diffmap, &hash, diff,
1496 }
1497 
1498 static void
1499 put_set(struct ConsensusSession *session,
1500  struct SetEntry *set)
1501 {
1502  struct GNUNET_HashCode hash;
1503 
1504  GNUNET_assert(NULL != set->h);
1505 
1507  "Putting set %s\n",
1508  debug_str_set_key(&set->key));
1509 
1510  GNUNET_CRYPTO_hash(&set->key, sizeof(struct SetKey), &hash);
1512  GNUNET_CONTAINER_multihashmap_put(session->setmap, &hash, set,
1514 }
1515 
1516 
1517 static void
1518 put_rfn(struct ConsensusSession *session,
1519  struct ReferendumEntry *rfn)
1520 {
1521  struct GNUNET_HashCode hash;
1522 
1523  GNUNET_CRYPTO_hash(&rfn->key, sizeof(struct RfnKey), &hash);
1525  GNUNET_CONTAINER_multihashmap_put(session->rfnmap, &hash, rfn,
1527 }
1528 
1529 
1530 
1531 static void
1533 {
1534  /* not implemented yet */
1535  GNUNET_assert(0);
1536 }
1537 
1538 
1539 static void
1541  struct ReferendumEntry *rfn,
1542  uint16_t voting_peer,
1543  uint16_t num_peers)
1544 {
1546  struct DiffElementInfo *di;
1547 
1549 
1550  while (GNUNET_YES ==
1552  NULL,
1553  (const void **)&di))
1554  {
1555  if (di->weight > 0)
1556  {
1557  rfn_vote(rfn, voting_peer, VOTE_ADD, di->element);
1558  }
1559  if (di->weight < 0)
1560  {
1561  rfn_vote(rfn, voting_peer, VOTE_REMOVE, di->element);
1562  }
1563  }
1564 
1566 }
1567 
1568 
1569 struct DiffEntry *
1571 {
1572  struct DiffEntry *d = GNUNET_new(struct DiffEntry);
1573 
1575 
1576  return d;
1577 }
1578 
1579 
1580 struct DiffEntry *
1581 diff_compose(struct DiffEntry *diff_1,
1582  struct DiffEntry *diff_2)
1583 {
1584  struct DiffEntry *diff_new;
1586  struct DiffElementInfo *di;
1587 
1588  diff_new = diff_create();
1589 
1591  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next(iter, NULL, (const void **)&di))
1592  {
1593  diff_insert(diff_new, di->weight, di->element);
1594  }
1596 
1598  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next(iter, NULL, (const void **)&di))
1599  {
1600  diff_insert(diff_new, di->weight, di->element);
1601  }
1603 
1604  return diff_new;
1605 }
1606 
1607 
1608 struct ReferendumEntry *
1609 rfn_create(uint16_t size)
1610 {
1611  struct ReferendumEntry *rfn;
1612 
1613  rfn = GNUNET_new(struct ReferendumEntry);
1615  rfn->peer_commited = GNUNET_new_array(size, int);
1616  rfn->peer_contested = GNUNET_new_array(size, int);
1617  rfn->num_peers = size;
1618 
1619  return rfn;
1620 }
1621 
1622 
1623 #if UNUSED
1624 static void
1625 diff_destroy(struct DiffEntry *diff)
1626 {
1628  GNUNET_free(diff);
1629 }
1630 #endif
1631 
1632 
1638 static void
1639 rfn_majority(const struct ReferendumEntry *rfn,
1640  const struct RfnElementInfo *ri,
1641  uint16_t *ret_majority,
1642  enum ReferendumVote *ret_vote)
1643 {
1644  uint16_t votes_yes = 0;
1645  uint16_t num_commited = 0;
1646  uint16_t i;
1647 
1649  "Computing rfn majority for element %s of rfn {%s}\n",
1650  debug_str_element(ri->element),
1651  debug_str_rfn_key(&rfn->key));
1652 
1653  for (i = 0; i < rfn->num_peers; i++)
1654  {
1655  if (GNUNET_NO == rfn->peer_commited[i])
1656  continue;
1657  num_commited++;
1658 
1659  if (GNUNET_YES == ri->votes[i])
1660  votes_yes++;
1661  }
1662 
1663  if (votes_yes > (num_commited) / 2)
1664  {
1665  *ret_vote = ri->proposal;
1666  *ret_majority = votes_yes;
1667  }
1668  else
1669  {
1670  *ret_vote = VOTE_STAY;
1671  *ret_majority = num_commited - votes_yes;
1672  }
1673 }
1674 
1675 
1676 struct SetCopyCls {
1677  struct TaskEntry *task;
1678  struct SetKey dst_set_key;
1679 };
1680 
1681 
1682 static void
1683 set_copy_cb(void *cls, struct GNUNET_SET_Handle *copy)
1684 {
1685  struct SetCopyCls *scc = cls;
1686  struct TaskEntry *task = scc->task;
1687  struct SetKey dst_set_key = scc->dst_set_key;
1688  struct SetEntry *set;
1689  struct SetHandle *sh = GNUNET_new(struct SetHandle);
1690 
1691  sh->h = copy;
1693  task->step->session->set_handles_tail,
1694  sh);
1695 
1696  GNUNET_free(scc);
1697  set = GNUNET_new(struct SetEntry);
1698  set->h = copy;
1699  set->key = dst_set_key;
1700  put_set(task->step->session, set);
1701 
1702  task->start(task);
1703 }
1704 
1705 
1710 static void
1712  struct SetKey *src_set_key,
1713  struct SetKey *dst_set_key)
1714 {
1715  struct SetEntry *src_set;
1716  struct SetCopyCls *scc = GNUNET_new(struct SetCopyCls);
1717 
1719  "Copying set {%s} to {%s} for task {%s}\n",
1720  debug_str_set_key(src_set_key),
1721  debug_str_set_key(dst_set_key),
1722  debug_str_task_key(&task->key));
1723 
1724  scc->task = task;
1725  scc->dst_set_key = *dst_set_key;
1726  src_set = lookup_set(task->step->session, src_set_key);
1727  GNUNET_assert(NULL != src_set);
1728  GNUNET_SET_copy_lazy(src_set->h,
1729  set_copy_cb,
1730  scc);
1731 }
1732 
1733 
1739  struct TaskEntry *task;
1740 };
1741 
1742 
1743 static void
1745 {
1746  struct SetMutationProgressCls *pc = cls;
1747 
1748  GNUNET_assert(pc->num_pending > 0);
1749 
1750  pc->num_pending--;
1751 
1752  if (0 == pc->num_pending)
1753  {
1754  struct TaskEntry *task = pc->task;
1755  GNUNET_free(pc);
1756  finish_task(task);
1757  }
1758 }
1759 
1760 
1761 static void
1763 {
1764  unsigned int i;
1765 
1766  if (GNUNET_YES == step->is_running)
1767  return;
1768  if (GNUNET_YES == step->is_finished)
1769  return;
1770  if (GNUNET_NO == step->early_finishable)
1771  return;
1772 
1773  step->is_finished = GNUNET_YES;
1774 
1775 #ifdef GNUNET_EXTRA_LOGGING
1777  "Finishing step `%s' early.\n",
1778  step->debug_name);
1779 #endif
1780 
1781  for (i = 0; i < step->subordinates_len; i++)
1782  {
1783  GNUNET_assert(step->subordinates[i]->pending_prereq > 0);
1784  step->subordinates[i]->pending_prereq--;
1785 #ifdef GNUNET_EXTRA_LOGGING
1787  "Decreased pending_prereq to %u for step `%s'.\n",
1788  (unsigned int)step->subordinates[i]->pending_prereq,
1789  step->subordinates[i]->debug_name);
1790 #endif
1792  }
1793 
1794  // XXX: maybe schedule as task to avoid recursion?
1795  run_ready_steps(step->session);
1796 }
1797 
1798 
1799 static void
1801 {
1802  unsigned int i;
1803 
1804  GNUNET_assert(step->finished_tasks == step->tasks_len);
1807 
1808 #ifdef GNUNET_EXTRA_LOGGING
1810  "All tasks of step `%s' with %u subordinates finished.\n",
1811  step->debug_name,
1812  step->subordinates_len);
1813 #endif
1814 
1815  for (i = 0; i < step->subordinates_len; i++)
1816  {
1817  GNUNET_assert(step->subordinates[i]->pending_prereq > 0);
1818  step->subordinates[i]->pending_prereq--;
1819 #ifdef GNUNET_EXTRA_LOGGING
1821  "Decreased pending_prereq to %u for step `%s'.\n",
1822  (unsigned int)step->subordinates[i]->pending_prereq,
1823  step->subordinates[i]->debug_name);
1824 #endif
1825  }
1826 
1827  step->is_finished = GNUNET_YES;
1828 
1829  // XXX: maybe schedule as task to avoid recursion?
1830  run_ready_steps(step->session);
1831 }
1832 
1833 
1834 
1841 static void
1843 {
1844  struct ConsensusSession *session = task->step->session;
1845  struct SetKey sk_in;
1846  struct SetKey sk_out;
1847  struct RfnKey rk_in;
1848  struct SetEntry *set_out;
1849  struct ReferendumEntry *rfn_in;
1851  struct RfnElementInfo *ri;
1852  struct SetMutationProgressCls *progress_cls;
1853  uint16_t worst_majority = UINT16_MAX;
1854 
1855  sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
1856  rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
1857  sk_out = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition + 1 };
1858 
1859  set_out = lookup_set(session, &sk_out);
1860  if (NULL == set_out)
1861  {
1862  create_set_copy_for_task(task, &sk_in, &sk_out);
1863  return;
1864  }
1865 
1866  rfn_in = lookup_rfn(session, &rk_in);
1867  GNUNET_assert(NULL != rfn_in);
1868 
1869  progress_cls = GNUNET_new(struct SetMutationProgressCls);
1870  progress_cls->task = task;
1871 
1873 
1874  while (GNUNET_YES ==
1876  NULL,
1877  (const void **)&ri))
1878  {
1879  uint16_t majority_num;
1880  enum ReferendumVote majority_vote;
1881 
1882  rfn_majority(rfn_in, ri, &majority_num, &majority_vote);
1883 
1884  if (worst_majority > majority_num)
1885  worst_majority = majority_num;
1886 
1887  switch (majority_vote)
1888  {
1889  case VOTE_ADD:
1890  progress_cls->num_pending++;
1892  GNUNET_SET_add_element(set_out->h,
1893  ri->element,
1895  progress_cls));
1897  "P%u: apply round: adding element %s with %u-majority.\n",
1898  session->local_peer_idx,
1899  debug_str_element(ri->element), majority_num);
1900  break;
1901 
1902  case VOTE_REMOVE:
1903  progress_cls->num_pending++;
1905  GNUNET_SET_remove_element(set_out->h,
1906  ri->element,
1908  progress_cls));
1910  "P%u: apply round: deleting element %s with %u-majority.\n",
1911  session->local_peer_idx,
1912  debug_str_element(ri->element), majority_num);
1913  break;
1914 
1915  case VOTE_STAY:
1917  "P%u: apply round: keeping element %s with %u-majority.\n",
1918  session->local_peer_idx,
1919  debug_str_element(ri->element), majority_num);
1920  // do nothing
1921  break;
1922 
1923  default:
1924  GNUNET_assert(0);
1925  break;
1926  }
1927  }
1928 
1929  if (0 == progress_cls->num_pending)
1930  {
1931  // call closure right now, no pending ops
1932  GNUNET_free(progress_cls);
1933  finish_task(task);
1934  }
1935 
1936  {
1937  uint16_t thresh = (session->num_peers / 3) * 2;
1938 
1939  if (worst_majority >= thresh)
1940  {
1941  switch (session->early_stopping)
1942  {
1943  case EARLY_STOPPING_NONE:
1946  "P%u: Stopping early (after one more superround)\n",
1947  session->local_peer_idx);
1948  break;
1949 
1951  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "P%u: finishing steps due to early finish\n",
1952  session->local_peer_idx);
1954  {
1955  struct Step *step;
1956  for (step = session->steps_head; NULL != step; step = step->next)
1957  try_finish_step_early(step);
1958  }
1959  break;
1960 
1961  case EARLY_STOPPING_DONE:
1962  /* We shouldn't be here anymore after early stopping */
1963  GNUNET_break(0);
1964  break;
1965 
1966  default:
1967  GNUNET_assert(0);
1968  break;
1969  }
1970  }
1971  else if (EARLY_STOPPING_NONE != session->early_stopping)
1972  {
1973  // Our assumption about the number of bad peers
1974  // has been broken.
1975  GNUNET_break_op(0);
1976  }
1977  else
1978  {
1979  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "P%u: NOT finishing early (majority not good enough)\n",
1980  session->local_peer_idx);
1981  }
1982  }
1984 }
1985 
1986 
1987 static void
1989 {
1990  struct ConsensusSession *session = task->step->session;
1991  struct ReferendumEntry *output_rfn;
1992  struct ReferendumEntry *input_rfn;
1993  struct DiffEntry *input_diff;
1994  struct RfnKey rfn_key;
1995  struct DiffKey diff_key;
1997  struct RfnElementInfo *ri;
1998  unsigned int gradecast_confidence = 2;
1999 
2000  rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
2001  output_rfn = lookup_rfn(session, &rfn_key);
2002  if (NULL == output_rfn)
2003  {
2004  output_rfn = rfn_create(session->num_peers);
2005  output_rfn->key = rfn_key;
2006  put_rfn(session, output_rfn);
2007  }
2008 
2009  diff_key = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2010  input_diff = lookup_diff(session, &diff_key);
2011  GNUNET_assert(NULL != input_diff);
2012 
2013  rfn_key = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2014  input_rfn = lookup_rfn(session, &rfn_key);
2015  GNUNET_assert(NULL != input_rfn);
2016 
2018 
2019  apply_diff_to_rfn(input_diff, output_rfn, task->key.leader, session->num_peers);
2020 
2021  while (GNUNET_YES ==
2023  NULL,
2024  (const void **)&ri))
2025  {
2026  uint16_t majority_num;
2027  enum ReferendumVote majority_vote;
2028 
2029  // XXX: we need contested votes and non-contested votes here
2030  rfn_majority(input_rfn, ri, &majority_num, &majority_vote);
2031 
2032  if (majority_num <= session->num_peers / 3)
2033  majority_vote = VOTE_REMOVE;
2034 
2035  switch (majority_vote)
2036  {
2037  case VOTE_STAY:
2038  break;
2039 
2040  case VOTE_ADD:
2041  rfn_vote(output_rfn, task->key.leader, VOTE_ADD, ri->element);
2042  break;
2043 
2044  case VOTE_REMOVE:
2045  rfn_vote(output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
2046  break;
2047 
2048  default:
2049  GNUNET_assert(0);
2050  break;
2051  }
2052  }
2054 
2055  {
2056  uint16_t noncontested;
2057  noncontested = rfn_noncontested(input_rfn);
2058  if (noncontested < (session->num_peers / 3) * 2)
2059  {
2060  gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
2061  }
2062  if (noncontested < (session->num_peers / 3) + 1)
2063  {
2064  gradecast_confidence = 0;
2065  }
2066  }
2067 
2068  if (gradecast_confidence >= 1)
2069  rfn_commit(output_rfn, task->key.leader);
2070 
2071  if (gradecast_confidence <= 1)
2072  session->peers_blacklisted[task->key.leader] = GNUNET_YES;
2073 
2074  finish_task(task);
2075 }
2076 
2077 
2078 static void
2080 {
2081  struct SetEntry *input;
2082  struct SetOpCls *setop = &task->cls.setop;
2083  struct ConsensusSession *session = task->step->session;
2084 
2085  input = lookup_set(session, &setop->input_set);
2086  GNUNET_assert(NULL != input);
2087  GNUNET_assert(NULL != input->h);
2088 
2089  /* We create the outputs for the operation here
2090  (rather than in the set operation callback)
2091  because we want something valid in there, even
2092  if the other peer doesn't talk to us */
2093 
2094  if (SET_KIND_NONE != setop->output_set.set_kind)
2095  {
2096  /* If we don't have an existing output set,
2097  we clone the input set. */
2098  if (NULL == lookup_set(session, &setop->output_set))
2099  {
2100  create_set_copy_for_task(task, &setop->input_set, &setop->output_set);
2101  return;
2102  }
2103  }
2104 
2105  if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
2106  {
2107  if (NULL == lookup_rfn(session, &setop->output_rfn))
2108  {
2109  struct ReferendumEntry *rfn;
2110 
2112  "P%u: output rfn <%s> missing, creating.\n",
2113  session->local_peer_idx,
2114  debug_str_rfn_key(&setop->output_rfn));
2115 
2116  rfn = rfn_create(session->num_peers);
2117  rfn->key = setop->output_rfn;
2118  put_rfn(session, rfn);
2119  }
2120  }
2121 
2122  if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
2123  {
2124  if (NULL == lookup_diff(session, &setop->output_diff))
2125  {
2126  struct DiffEntry *diff;
2127 
2128  diff = diff_create();
2129  diff->key = setop->output_diff;
2130  put_diff(session, diff);
2131  }
2132  }
2133 
2134  if ((task->key.peer1 == session->local_peer_idx) && (task->key.peer2 == session->local_peer_idx))
2135  {
2136  /* XXX: mark the corresponding rfn as commited if necessary */
2137  finish_task(task);
2138  return;
2139  }
2140 
2141  if (task->key.peer1 == session->local_peer_idx)
2142  {
2144 
2146  "P%u: Looking up set {%s} to run remote union\n",
2147  session->local_peer_idx,
2148  debug_str_set_key(&setop->input_set));
2149 
2151  rcm.header.size = htons(sizeof(struct GNUNET_CONSENSUS_RoundContextMessage));
2152 
2153  rcm.kind = htons(task->key.kind);
2154  rcm.peer1 = htons(task->key.peer1);
2155  rcm.peer2 = htons(task->key.peer2);
2156  rcm.leader = htons(task->key.leader);
2157  rcm.repetition = htons(task->key.repetition);
2158  rcm.is_contested = htons(0);
2159 
2160  GNUNET_assert(NULL == setop->op);
2161  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our set is %s\n",
2162  session->local_peer_idx, task->key.peer2, debug_str_set_key(&setop->input_set));
2163 
2164  struct GNUNET_SET_Option opts[] = {
2165  { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2167  };
2168 
2169  // XXX: maybe this should be done while
2170  // setting up tasks alreays?
2171  setop->op = GNUNET_SET_prepare(&session->peers[task->key.peer2],
2172  &session->global_id,
2173  &rcm.header,
2175  opts,
2176  set_result_cb,
2177  task);
2178 
2179  commit_set(session, task);
2180  }
2181  else if (task->key.peer2 == session->local_peer_idx)
2182  {
2183  /* Wait for the other peer to contact us */
2184  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
2185  session->local_peer_idx, task->key.peer1);
2186 
2187  if (NULL != setop->op)
2188  {
2189  commit_set(session, task);
2190  }
2191  }
2192  else
2193  {
2194  /* We made an error while constructing the task graph. */
2195  GNUNET_assert(0);
2196  }
2197 }
2198 
2199 
2200 static void
2202 {
2204  struct ReferendumEntry *input_rfn;
2205  struct RfnElementInfo *ri;
2206  struct SetEntry *output_set;
2207  struct SetMutationProgressCls *progress_cls;
2208  struct ConsensusSession *session = task->step->session;
2209  struct SetKey sk_in;
2210  struct SetKey sk_out;
2211  struct RfnKey rk_in;
2212 
2213  sk_in = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, task->key.repetition, task->key.leader };
2214  sk_out = (struct SetKey) { SET_KIND_ECHO_RESULT, task->key.repetition, task->key.leader };
2215  output_set = lookup_set(session, &sk_out);
2216  if (NULL == output_set)
2217  {
2218  create_set_copy_for_task(task, &sk_in, &sk_out);
2219  return;
2220  }
2221 
2222 
2223  {
2224  // FIXME: should be marked as a shallow copy, so
2225  // we can destroy everything correctly
2226  struct SetEntry *last_set = GNUNET_new(struct SetEntry);
2227  last_set->h = output_set->h;
2228  last_set->key = (struct SetKey) { SET_KIND_LAST_GRADECAST };
2229  put_set(session, last_set);
2230  }
2231 
2233  "Evaluating referendum in Task {%s}\n",
2234  debug_str_task_key(&task->key));
2235 
2236  progress_cls = GNUNET_new(struct SetMutationProgressCls);
2237  progress_cls->task = task;
2238 
2239  rk_in = (struct RfnKey) { RFN_KIND_ECHO, task->key.repetition, task->key.leader };
2240  input_rfn = lookup_rfn(session, &rk_in);
2241 
2242  GNUNET_assert(NULL != input_rfn);
2243 
2245  GNUNET_assert(NULL != iter);
2246 
2247  while (GNUNET_YES ==
2249  NULL,
2250  (const void **)&ri))
2251  {
2252  enum ReferendumVote majority_vote;
2253  uint16_t majority_num;
2254 
2255  rfn_majority(input_rfn, ri, &majority_num, &majority_vote);
2256 
2257  if (majority_num < session->num_peers / 3)
2258  {
2259  /* It is not the case that all nonfaulty peers
2260  echoed the same value. Since we're doing a set reconciliation, we
2261  can't simply send "nothing" for the value. Thus we mark our 'confirm'
2262  reconciliation as contested. Other peers might not know that the
2263  leader is faulty, thus we still re-distribute in the confirmation
2264  round. */
2265  output_set->is_contested = GNUNET_YES;
2266  }
2267 
2268  switch (majority_vote)
2269  {
2270  case VOTE_ADD:
2271  progress_cls->num_pending++;
2273  GNUNET_SET_add_element(output_set->h,
2274  ri->element,
2276  progress_cls));
2277  break;
2278 
2279  case VOTE_REMOVE:
2280  progress_cls->num_pending++;
2282  GNUNET_SET_remove_element(output_set->h,
2283  ri->element,
2285  progress_cls));
2286  break;
2287 
2288  case VOTE_STAY:
2289  /* Nothing to do. */
2290  break;
2291 
2292  default:
2293  /* not reached */
2294  GNUNET_assert(0);
2295  }
2296  }
2297 
2299 
2300  if (0 == progress_cls->num_pending)
2301  {
2302  // call closure right now, no pending ops
2303  GNUNET_free(progress_cls);
2304  finish_task(task);
2305  }
2306 }
2307 
2308 
2309 static void
2311 {
2312  struct SetEntry *final_set;
2313  struct ConsensusSession *session = task->step->session;
2314 
2315  final_set = lookup_set(session, &task->cls.finish.input_set);
2316 
2317  GNUNET_assert(NULL != final_set);
2318 
2319 
2320  GNUNET_SET_iterate(final_set->h,
2322  task);
2323 }
2324 
2325 static void
2326 start_task(struct ConsensusSession *session, struct TaskEntry *task)
2327 {
2328  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", session->local_peer_idx, debug_str_task_key(&task->key));
2329 
2332  GNUNET_assert(NULL != task->start);
2333 
2334  task->start(task);
2335 
2336  task->is_started = GNUNET_YES;
2337 }
2338 
2339 
2340 
2341 
2342 /*
2343  * Run all steps of the session that don't any
2344  * more dependencies.
2345  */
2346 static void
2348 {
2349  struct Step *step;
2350 
2351  step = session->steps_head;
2352 
2353  while (NULL != step)
2354  {
2355  if ((GNUNET_NO == step->is_running) && (0 == step->pending_prereq) && (GNUNET_NO == step->is_finished))
2356  {
2357  size_t i;
2358 
2359  GNUNET_assert(0 == step->finished_tasks);
2360 
2361 #ifdef GNUNET_EXTRA_LOGGING
2362  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "P%u: Running step `%s' of round %d with %d tasks and %d subordinates\n",
2363  session->local_peer_idx,
2364  step->debug_name,
2365  step->round, step->tasks_len, step->subordinates_len);
2366 #endif
2367 
2368  step->is_running = GNUNET_YES;
2369  for (i = 0; i < step->tasks_len; i++)
2370  start_task(session, step->tasks[i]);
2371 
2372  /* Sometimes there is no task to trigger finishing the step, so we have to do it here. */
2373  if ((step->finished_tasks == step->tasks_len) && (GNUNET_NO == step->is_finished))
2374  finish_step(step);
2375 
2376  /* Running the next ready steps will be triggered by task completion */
2377  return;
2378  }
2379  step = step->next;
2380  }
2381 
2382  return;
2383 }
2384 
2385 
2386 
2387 static void
2388 finish_task(struct TaskEntry *task)
2389 {
2391  task->is_finished = GNUNET_YES;
2392 
2393  task->step->finished_tasks++;
2394 
2396  "P%u: Finishing Task {%s} (now %u/%u tasks finished in step)\n",
2397  task->step->session->local_peer_idx,
2398  debug_str_task_key(&task->key),
2399  (unsigned int)task->step->finished_tasks,
2400  (unsigned int)task->step->tasks_len);
2401 
2402  if (task->step->finished_tasks == task->step->tasks_len)
2403  finish_step(task->step);
2404 }
2405 
2406 
2414 static int
2415 get_peer_idx(const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
2416 {
2417  int i;
2418 
2419  for (i = 0; i < session->num_peers; i++)
2420  if (0 == GNUNET_memcmp(peer, &session->peers[i]))
2421  return i;
2422  return -1;
2423 }
2424 
2425 
2435 static void
2437  const struct GNUNET_HashCode *local_session_id)
2438 {
2439  const char *salt = "gnunet-service-consensus/session_id";
2440 
2442  GNUNET_CRYPTO_kdf(&session->global_id,
2443  sizeof(struct GNUNET_HashCode),
2444  salt,
2445  strlen(salt),
2446  session->peers,
2447  session->num_peers * sizeof(struct GNUNET_PeerIdentity),
2448  local_session_id,
2449  sizeof(struct GNUNET_HashCode),
2450  NULL));
2451 }
2452 
2453 
2461 static int
2462 peer_id_cmp(const void *h1, const void *h2)
2463 {
2464  return memcmp(h1, h2, sizeof(struct GNUNET_PeerIdentity));
2465 }
2466 
2467 
2475 static void
2477  const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
2478 {
2479  const struct GNUNET_PeerIdentity *msg_peers
2480  = (const struct GNUNET_PeerIdentity *)&join_msg[1];
2481  int local_peer_in_list;
2482 
2483  session->num_peers = ntohl(join_msg->num_peers);
2484 
2485  /* Peers in the join message, may or may not include the local peer,
2486  Add it if it is missing. */
2487  local_peer_in_list = GNUNET_NO;
2488  for (unsigned int i = 0; i < session->num_peers; i++)
2489  {
2490  if (0 == GNUNET_memcmp(&msg_peers[i],
2491  &my_peer))
2492  {
2493  local_peer_in_list = GNUNET_YES;
2494  break;
2495  }
2496  }
2497  if (GNUNET_NO == local_peer_in_list)
2498  session->num_peers++;
2499 
2500  session->peers = GNUNET_new_array(session->num_peers,
2501  struct GNUNET_PeerIdentity);
2502  if (GNUNET_NO == local_peer_in_list)
2503  session->peers[session->num_peers - 1] = my_peer;
2504 
2505  GNUNET_memcpy(session->peers,
2506  msg_peers,
2507  ntohl(join_msg->num_peers) * sizeof(struct GNUNET_PeerIdentity));
2508  qsort(session->peers,
2509  session->num_peers,
2510  sizeof(struct GNUNET_PeerIdentity),
2511  &peer_id_cmp);
2512 }
2513 
2514 
2515 static struct TaskEntry *
2517  struct TaskKey *key)
2518 {
2519  struct GNUNET_HashCode hash;
2520 
2521 
2522  GNUNET_CRYPTO_hash(key, sizeof(struct TaskKey), &hash);
2523  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Looking up task hash %s\n",
2524  GNUNET_h2s(&hash));
2525  return GNUNET_CONTAINER_multihashmap_get(session->taskmap, &hash);
2526 }
2527 
2528 
2544 static void
2545 set_listen_cb(void *cls,
2546  const struct GNUNET_PeerIdentity *other_peer,
2547  const struct GNUNET_MessageHeader *context_msg,
2548  struct GNUNET_SET_Request *request)
2549 {
2550  struct ConsensusSession *session = cls;
2551  struct TaskKey tk;
2552  struct TaskEntry *task;
2554 
2555  if (NULL == context_msg)
2556  {
2557  GNUNET_break_op(0);
2558  return;
2559  }
2560 
2561  if (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT != ntohs(context_msg->type))
2562  {
2563  GNUNET_break_op(0);
2564  return;
2565  }
2566 
2567  if (sizeof(struct GNUNET_CONSENSUS_RoundContextMessage) != ntohs(context_msg->size))
2568  {
2569  GNUNET_break_op(0);
2570  return;
2571  }
2572 
2573  cm = (struct GNUNET_CONSENSUS_RoundContextMessage *)context_msg;
2574 
2575  tk = ((struct TaskKey) {
2576  .kind = ntohs(cm->kind),
2577  .peer1 = ntohs(cm->peer1),
2578  .peer2 = ntohs(cm->peer2),
2579  .repetition = ntohs(cm->repetition),
2580  .leader = ntohs(cm->leader),
2581  });
2582 
2583  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "P%u: got req for task %s\n",
2584  session->local_peer_idx, debug_str_task_key(&tk));
2585 
2586  task = lookup_task(session, &tk);
2587 
2588  if (NULL == task)
2589  {
2590  GNUNET_break_op(0);
2591  return;
2592  }
2593 
2594  if (GNUNET_YES == task->is_finished)
2595  {
2596  GNUNET_break_op(0);
2597  return;
2598  }
2599 
2600  if (task->key.peer2 != session->local_peer_idx)
2601  {
2602  /* We're being asked, so we must be thne 2nd peer. */
2603  GNUNET_break_op(0);
2604  return;
2605  }
2606 
2607  GNUNET_assert(!((task->key.peer1 == session->local_peer_idx) &&
2608  (task->key.peer2 == session->local_peer_idx)));
2609 
2610  struct GNUNET_SET_Option opts[] = {
2611  { GNUNET_SET_OPTION_BYZANTINE, { .num = session->lower_bound } },
2613  };
2614 
2615  task->cls.setop.op = GNUNET_SET_accept(request,
2617  opts,
2618  set_result_cb,
2619  task);
2620 
2621  /* If the task hasn't been started yet,
2622  we wait for that until we commit. */
2623 
2624  if (GNUNET_YES == task->is_started)
2625  {
2626  commit_set(session, task);
2627  }
2628 }
2629 
2630 
2631 
2632 static void
2634  struct TaskEntry *t)
2635 {
2636  struct GNUNET_HashCode round_hash;
2637  struct Step *s;
2638 
2639  GNUNET_assert(NULL != t->step);
2640 
2641  t = GNUNET_memdup(t, sizeof(struct TaskEntry));
2642 
2643  s = t->step;
2644 
2645  if (s->tasks_len == s->tasks_cap)
2646  {
2647  unsigned int target_size = 3 * (s->tasks_cap + 1) / 2;
2649  s->tasks_cap,
2650  target_size);
2651  }
2652 
2653 #ifdef GNUNET_EXTRA_LOGGING
2654  GNUNET_assert(NULL != s->debug_name);
2655  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Putting task <%s> into step `%s'\n",
2656  debug_str_task_key(&t->key),
2657  s->debug_name);
2658 #endif
2659 
2660  s->tasks[s->tasks_len] = t;
2661  s->tasks_len++;
2662 
2663  GNUNET_CRYPTO_hash(&t->key, sizeof(struct TaskKey), &round_hash);
2665  GNUNET_CONTAINER_multihashmap_put(taskmap, &round_hash, t,
2667 }
2668 
2669 
2670 static void
2672 {
2673  /* Given the fully constructed task graph
2674  with rounds for tasks, we can give the tasks timeouts. */
2675 
2676  // unsigned int max_round;
2677 
2678  /* XXX: implement! */
2679 }
2680 
2681 
2682 
2683 /*
2684  * Arrange two peers in some canonical order.
2685  */
2686 static void
2687 arrange_peers(uint16_t *p1, uint16_t *p2, uint16_t n)
2688 {
2689  uint16_t a;
2690  uint16_t b;
2691 
2692  GNUNET_assert(*p1 < n);
2693  GNUNET_assert(*p2 < n);
2694 
2695  if (*p1 < *p2)
2696  {
2697  a = *p1;
2698  b = *p2;
2699  }
2700  else
2701  {
2702  a = *p2;
2703  b = *p1;
2704  }
2705 
2706  /* For uniformly random *p1, *p2,
2707  this condition is true with 50% chance */
2708  if (((b - a) + n) % n <= n / 2)
2709  {
2710  *p1 = a;
2711  *p2 = b;
2712  }
2713  else
2714  {
2715  *p1 = b;
2716  *p2 = a;
2717  }
2718 }
2719 
2720 
2724 static void
2725 step_depend_on(struct Step *step, struct Step *dep)
2726 {
2727  /* We're not checking for cyclic dependencies,
2728  but this is a cheap sanity check. */
2729  GNUNET_assert(step != dep);
2730  GNUNET_assert(NULL != step);
2731  GNUNET_assert(NULL != dep);
2732  GNUNET_assert(dep->round <= step->round);
2733 
2734 #ifdef GNUNET_EXTRA_LOGGING
2735  /* Make sure we have complete debugging information.
2736  Also checks that we don't screw up too badly
2737  constructing the task graph. */
2738  GNUNET_assert(NULL != step->debug_name);
2739  GNUNET_assert(NULL != dep->debug_name);
2741  "Making step `%s' depend on `%s'\n",
2742  step->debug_name,
2743  dep->debug_name);
2744 #endif
2745 
2746  if (dep->subordinates_cap == dep->subordinates_len)
2747  {
2748  unsigned int target_size = 3 * (dep->subordinates_cap + 1) / 2;
2750  dep->subordinates_cap,
2751  target_size);
2752  }
2753 
2755 
2756  dep->subordinates[dep->subordinates_len] = step;
2757  dep->subordinates_len++;
2758 
2759  step->pending_prereq++;
2760 }
2761 
2762 
2763 static struct Step *
2765 {
2766  struct Step *step;
2767 
2768  step = GNUNET_new(struct Step);
2769  step->session = session;
2770  step->round = round;
2773  session->steps_tail,
2774  step);
2775  return step;
2776 }
2777 
2778 
2783 static void
2785  uint16_t rep,
2786  uint16_t lead,
2787  struct Step *step_before,
2788  struct Step *step_after)
2789 {
2790  uint16_t n = session->num_peers;
2791  uint16_t me = session->local_peer_idx;
2792 
2793  uint16_t p1;
2794  uint16_t p2;
2795 
2796  /* The task we're currently setting up. */
2797  struct TaskEntry task;
2798 
2799  struct Step *step;
2800  struct Step *prev_step;
2801 
2802  uint16_t round;
2803 
2804  unsigned int k;
2805 
2806  round = step_before->round + 1;
2807 
2808  /* gcast step 1: leader disseminates */
2809 
2810  step = create_step(session, round, GNUNET_YES);
2811 
2812 #ifdef GNUNET_EXTRA_LOGGING
2813  GNUNET_asprintf(&step->debug_name, "disseminate leader %u rep %u", lead, rep);
2814 #endif
2815  step_depend_on(step, step_before);
2816 
2817  if (lead == me)
2818  {
2819  for (k = 0; k < n; k++)
2820  {
2821  if (k == me)
2822  continue;
2823  p1 = me;
2824  p2 = k;
2825  arrange_peers(&p1, &p2, n);
2826  task = ((struct TaskEntry) {
2827  .step = step,
2828  .start = task_start_reconcile,
2829  .cancel = task_cancel_reconcile,
2830  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me },
2831  });
2832  task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2833  put_task(session->taskmap, &task);
2834  }
2835  /* We run this task to make sure that the leader
2836  has the stored the SET_KIND_LEADER set of himself,
2837  so it can participate in the rest of the gradecast
2838  without the code having to handle any special cases. */
2839  task = ((struct TaskEntry) {
2840  .step = step,
2841  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
2842  .start = task_start_reconcile,
2843  .cancel = task_cancel_reconcile,
2844  });
2845  task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2846  task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, me };
2847  task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, me };
2848  put_task(session->taskmap, &task);
2849  }
2850  else
2851  {
2852  p1 = me;
2853  p2 = lead;
2854  arrange_peers(&p1, &p2, n);
2855  task = ((struct TaskEntry) {
2856  .step = step,
2857  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, lead },
2858  .start = task_start_reconcile,
2859  .cancel = task_cancel_reconcile,
2860  });
2861  task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
2862  task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2863  task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, rep, lead };
2864  put_task(session->taskmap, &task);
2865  }
2866 
2867  /* gcast phase 2: echo */
2868  prev_step = step;
2869  round += 1;
2870  step = create_step(session, round, GNUNET_YES);
2871 #ifdef GNUNET_EXTRA_LOGGING
2872  GNUNET_asprintf(&step->debug_name, "echo leader %u rep %u", lead, rep);
2873 #endif
2874  step_depend_on(step, prev_step);
2875 
2876  for (k = 0; k < n; k++)
2877  {
2878  p1 = k;
2879  p2 = me;
2880  arrange_peers(&p1, &p2, n);
2881  task = ((struct TaskEntry) {
2882  .step = step,
2883  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
2884  .start = task_start_reconcile,
2885  .cancel = task_cancel_reconcile,
2886  });
2887  task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, rep, lead };
2888  task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
2889  put_task(session->taskmap, &task);
2890  }
2891 
2892  prev_step = step;
2893  /* Same round, since step only has local tasks */
2894  step = create_step(session, round, GNUNET_YES);
2895 #ifdef GNUNET_EXTRA_LOGGING
2896  GNUNET_asprintf(&step->debug_name, "echo grade leader %u rep %u", lead, rep);
2897 #endif
2898  step_depend_on(step, prev_step);
2899 
2900  arrange_peers(&p1, &p2, n);
2901  task = ((struct TaskEntry) {
2902  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, lead },
2903  .step = step,
2904  .start = task_start_eval_echo
2905  });
2906  put_task(session->taskmap, &task);
2907 
2908  prev_step = step;
2909  round += 1;
2910  step = create_step(session, round, GNUNET_YES);
2911 #ifdef GNUNET_EXTRA_LOGGING
2912  GNUNET_asprintf(&step->debug_name, "confirm leader %u rep %u", lead, rep);
2913 #endif
2914  step_depend_on(step, prev_step);
2915 
2916  /* gcast phase 3: confirmation and grading */
2917  for (k = 0; k < n; k++)
2918  {
2919  p1 = k;
2920  p2 = me;
2921  arrange_peers(&p1, &p2, n);
2922  task = ((struct TaskEntry) {
2923  .step = step,
2924  .start = task_start_reconcile,
2925  .cancel = task_cancel_reconcile,
2926  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, lead },
2927  });
2928  task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead };
2929  task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead };
2930  /* If there was at least one element in the echo round that was
2931  contested (i.e. it had no n-t majority), then we let the other peers
2932  know, and other peers let us know. The contested flag for each peer is
2933  stored in the rfn. */
2935  put_task(session->taskmap, &task);
2936  }
2937 
2938  prev_step = step;
2939  /* Same round, since step only has local tasks */
2940  step = create_step(session, round, GNUNET_YES);
2941 #ifdef GNUNET_EXTRA_LOGGING
2942  GNUNET_asprintf(&step->debug_name, "confirm grade leader %u rep %u", lead, rep);
2943 #endif
2944  step_depend_on(step, prev_step);
2945 
2946  task = ((struct TaskEntry) {
2947  .step = step,
2948  .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, lead },
2949  .start = task_start_grade,
2950  });
2951  put_task(session->taskmap, &task);
2952 
2953  step_depend_on(step_after, step);
2954 }
2955 
2956 
2957 static void
2959 {
2960  uint16_t n = session->num_peers;
2961  uint16_t t = n / 3;
2962 
2963  uint16_t me = session->local_peer_idx;
2964 
2965  /* The task we're currently setting up. */
2966  struct TaskEntry task;
2967 
2968  /* Current leader */
2969  unsigned int lead;
2970 
2971  struct Step *step;
2972  struct Step *prev_step;
2973 
2974  unsigned int round = 0;
2975 
2976  unsigned int i;
2977 
2978  // XXX: introduce first step,
2979  // where we wait for all insert acks
2980  // from the set service
2981 
2982  /* faster but brittle all-to-all */
2983 
2984  // XXX: Not implemented yet
2985 
2986  /* all-to-all step */
2987 
2988  step = create_step(session, round, GNUNET_NO);
2989 
2990 #ifdef GNUNET_EXTRA_LOGGING
2991  step->debug_name = GNUNET_strdup("all to all");
2992 #endif
2993 
2994  for (i = 0; i < n; i++)
2995  {
2996  uint16_t p1;
2997  uint16_t p2;
2998 
2999  p1 = me;
3000  p2 = i;
3001  arrange_peers(&p1, &p2, n);
3002  task = ((struct TaskEntry) {
3003  .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
3004  .step = step,
3005  .start = task_start_reconcile,
3006  .cancel = task_cancel_reconcile,
3007  });
3008  task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3009  task.cls.setop.output_set = task.cls.setop.input_set;
3011  put_task(session->taskmap, &task);
3012  }
3013 
3014  round += 1;
3015  prev_step = step;
3016  step = create_step(session, round, GNUNET_NO);;
3017 #ifdef GNUNET_EXTRA_LOGGING
3018  step->debug_name = GNUNET_strdup("all to all 2");
3019 #endif
3020  step_depend_on(step, prev_step);
3021 
3022 
3023  for (i = 0; i < n; i++)
3024  {
3025  uint16_t p1;
3026  uint16_t p2;
3027 
3028  p1 = me;
3029  p2 = i;
3030  arrange_peers(&p1, &p2, n);
3031  task = ((struct TaskEntry) {
3032  .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL_2, p1, p2, -1, -1 },
3033  .step = step,
3034  .start = task_start_reconcile,
3035  .cancel = task_cancel_reconcile,
3036  });
3037  task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
3038  task.cls.setop.output_set = task.cls.setop.input_set;
3040  put_task(session->taskmap, &task);
3041  }
3042 
3043  round += 1;
3044 
3045  prev_step = step;
3046  step = NULL;
3047 
3048 
3049 
3050  /* Byzantine union */
3051 
3052  /* sequential repetitions of the gradecasts */
3053  for (i = 0; i < t + 1; i++)
3054  {
3055  struct Step *step_rep_start;
3056  struct Step *step_rep_end;
3057 
3058  /* Every repetition is in a separate round. */
3059  step_rep_start = create_step(session, round, GNUNET_YES);
3060 #ifdef GNUNET_EXTRA_LOGGING
3061  GNUNET_asprintf(&step_rep_start->debug_name, "gradecast start rep %u", i);
3062 #endif
3063 
3064  step_depend_on(step_rep_start, prev_step);
3065 
3066  /* gradecast has three rounds */
3067  round += 3;
3068  step_rep_end = create_step(session, round, GNUNET_YES);
3069 #ifdef GNUNET_EXTRA_LOGGING
3070  GNUNET_asprintf(&step_rep_end->debug_name, "gradecast end rep %u", i);
3071 #endif
3072 
3073  /* parallel gradecasts */
3074  for (lead = 0; lead < n; lead++)
3075  construct_task_graph_gradecast(session, i, lead, step_rep_start, step_rep_end);
3076 
3077  task = ((struct TaskEntry) {
3078  .step = step_rep_end,
3079  .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1 },
3080  .start = task_start_apply_round,
3081  });
3082  put_task(session->taskmap, &task);
3083 
3084  prev_step = step_rep_end;
3085  }
3086 
3087  /* There is no next gradecast round, thus the final
3088  start step is the overall end step of the gradecasts */
3089  round += 1;
3090  step = create_step(session, round, GNUNET_NO);
3091 #ifdef GNUNET_EXTRA_LOGGING
3092  GNUNET_asprintf(&step->debug_name, "finish");
3093 #endif
3094  step_depend_on(step, prev_step);
3095 
3096  task = ((struct TaskEntry) {
3097  .step = step,
3098  .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
3099  .start = task_start_finish,
3100  });
3101  task.cls.finish.input_set = (struct SetKey) { SET_KIND_LAST_GRADECAST };
3102 
3103  put_task(session->taskmap, &task);
3104 }
3105 
3106 
3107 
3115 static int
3117  const struct GNUNET_CONSENSUS_JoinMessage *m)
3118 {
3119  uint32_t listed_peers = ntohl(m->num_peers);
3120 
3121  if ((ntohs(m->header.size) - sizeof(*m)) !=
3122  listed_peers * sizeof(struct GNUNET_PeerIdentity))
3123  {
3124  GNUNET_break(0);
3125  return GNUNET_SYSERR;
3126  }
3127  return GNUNET_OK;
3128 }
3129 
3130 
3137 static void
3139  const struct GNUNET_CONSENSUS_JoinMessage *m)
3140 {
3141  struct ConsensusSession *session = cls;
3142  struct ConsensusSession *other_session;
3143 
3145  m);
3146  compute_global_id(session,
3147  &m->session_id);
3148 
3149  /* Check if some local client already owns the session.
3150  It is only legal to have a session with an existing global id
3151  if all other sessions with this global id are finished.*/
3152  for (other_session = sessions_head;
3153  NULL != other_session;
3154  other_session = other_session->next)
3155  {
3156  if ((other_session != session) &&
3157  (0 == GNUNET_CRYPTO_hash_cmp(&session->global_id,
3158  &other_session->global_id)))
3159  break;
3160  }
3161 
3162  session->conclude_deadline
3164  session->conclude_start
3166  session->local_peer_idx = get_peer_idx(&my_peer,
3167  session);
3168  GNUNET_assert(-1 != session->local_peer_idx);
3169 
3171  "Joining consensus session %s containing %u peers as %u with timeout %s\n",
3172  GNUNET_h2s(&m->session_id),
3173  session->num_peers,
3174  session->local_peer_idx,
3177  session->conclude_deadline),
3178  GNUNET_YES));
3179 
3180  session->set_listener
3181  = GNUNET_SET_listen(cfg,
3183  &session->global_id,
3184  &set_listen_cb,
3185  session);
3186 
3188  GNUNET_NO);
3190  GNUNET_NO);
3192  GNUNET_NO);
3194  GNUNET_NO);
3195 
3196  {
3197  struct SetEntry *client_set;
3198 
3199  client_set = GNUNET_new(struct SetEntry);
3200  client_set->h = GNUNET_SET_create(cfg,
3202  struct SetHandle *sh = GNUNET_new(struct SetHandle);
3203  sh->h = client_set->h;
3205  session->set_handles_tail,
3206  sh);
3207  client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 });
3208  put_set(session,
3209  client_set);
3210  }
3211 
3212  session->peers_blacklisted = GNUNET_new_array(session->num_peers,
3213  int);
3214 
3215  /* Just construct the task graph,
3216  but don't run anything until the client calls conclude. */
3217  construct_task_graph(session);
3219 }
3220 
3221 
3222 static void
3224 {
3225  // FIXME: implement
3226 }
3227 
3228 
3236 static int
3238  const struct GNUNET_CONSENSUS_ElementMessage *msg)
3239 {
3240  return GNUNET_OK;
3241 }
3242 
3243 
3250 static void
3252  const struct GNUNET_CONSENSUS_ElementMessage *msg)
3253 {
3254  struct ConsensusSession *session = cls;
3255  ssize_t element_size;
3256  struct GNUNET_SET_Handle *initial_set;
3257  struct ConsensusElement *ce;
3258 
3259  if (GNUNET_YES == session->conclude_started)
3260  {
3261  GNUNET_break(0);
3263  return;
3264  }
3265 
3266  element_size = ntohs(msg->header.size) - sizeof(struct GNUNET_CONSENSUS_ElementMessage);
3267  ce = GNUNET_malloc(sizeof(struct ConsensusElement) + element_size);
3268  GNUNET_memcpy(&ce[1], &msg[1], element_size);
3269  ce->payload_type = msg->element_type;
3270 
3271  struct GNUNET_SET_Element element = {
3273  .size = sizeof(struct ConsensusElement) + element_size,
3274  .data = ce,
3275  };
3276 
3277  {
3278  struct SetKey key = { SET_KIND_CURRENT, 0, 0 };
3279  struct SetEntry *entry;
3280 
3281  entry = lookup_set(session,
3282  &key);
3283  GNUNET_assert(NULL != entry);
3284  initial_set = entry->h;
3285  }
3286 
3287  session->num_client_insert_pending++;
3288  GNUNET_SET_add_element(initial_set,
3289  &element,
3291  session);
3292 
3293 #ifdef GNUNET_EXTRA_LOGGING
3294  {
3296  "P%u: element %s added\n",
3297  session->local_peer_idx,
3298  debug_str_element(&element));
3299  }
3300 #endif
3301  GNUNET_free(ce);
3303 }
3304 
3305 
3312 static void
3314  const struct GNUNET_MessageHeader *message)
3315 {
3316  struct ConsensusSession *session = cls;
3317 
3318  if (GNUNET_YES == session->conclude_started)
3319  {
3320  /* conclude started twice */
3321  GNUNET_break(0);
3323  return;
3324  }
3326  "conclude requested\n");
3327  session->conclude_started = GNUNET_YES;
3328  install_step_timeouts(session);
3329  run_ready_steps(session);
3331 }
3332 
3333 
3339 static void
3340 shutdown_task(void *cls)
3341 {
3343  "shutting down\n");
3344  GNUNET_STATISTICS_destroy(statistics,
3345  GNUNET_NO);
3346  statistics = NULL;
3347 }
3348 
3349 
3357 static void
3358 run(void *cls,
3359  const struct GNUNET_CONFIGURATION_Handle *c,
3361 {
3362  cfg = c;
3363  if (GNUNET_OK !=
3365  &my_peer))
3366  {
3368  "Could not retrieve host identity\n");
3370  return;
3371  }
3372  statistics = GNUNET_STATISTICS_create("consensus",
3373  cfg);
3375  NULL);
3376 }
3377 
3378 
3387 static void *
3389  struct GNUNET_SERVICE_Client *c,
3390  struct GNUNET_MQ_Handle *mq)
3391 {
3392  struct ConsensusSession *session = GNUNET_new(struct ConsensusSession);
3393 
3394  session->client = c;
3395  session->client_mq = mq;
3396  GNUNET_CONTAINER_DLL_insert(sessions_head,
3397  sessions_tail,
3398  session);
3399  return session;
3400 }
3401 
3402 
3410 static void
3412  struct GNUNET_SERVICE_Client *c,
3413  void *internal_cls)
3414 {
3415  struct ConsensusSession *session = internal_cls;
3416 
3417  if (NULL != session->set_listener)
3418  {
3420  session->set_listener = NULL;
3421  }
3422  GNUNET_CONTAINER_DLL_remove(sessions_head,
3423  sessions_tail,
3424  session);
3425 
3426  while (session->set_handles_head)
3427  {
3428  struct SetHandle *sh = session->set_handles_head;
3429  session->set_handles_head = sh->next;
3430  GNUNET_SET_destroy(sh->h);
3431  GNUNET_free(sh);
3432  }
3433  GNUNET_free(session);
3434 }
3435 
3436 
3441  ("consensus",
3443  &run,
3446  NULL,
3447  GNUNET_MQ_hd_fixed_size(client_conclude,
3449  struct GNUNET_MessageHeader,
3450  NULL),
3451  GNUNET_MQ_hd_var_size(client_insert,
3454  NULL),
3455  GNUNET_MQ_hd_var_size(client_join,
3458  NULL),
3460 
3461 /* end of gnunet-service-consensus.c */
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT
Abort a round, don&#39;t send requested elements anymore.
uint64_t first_size
Our set size from the first round.
Vote that an element should be removed.
struct FinishCls finish
void GNUNET_CONTAINER_multihashmap_iterator_destroy(struct GNUNET_CONTAINER_MultiHashMapIterator *iter)
Destroy a multihashmap iterator.
Closure for both start_task and cancel_task.
int * peer_contested
Contestation state of the peer.
static GNUNET_NETWORK_STRUCT_END struct GNUNET_PeerIdentity me
Our own peer identity.
static int cmp_uint64_t(const void *pa, const void *pb)
struct GNUNET_CONTAINER_MultiHashMap * rfnmap
static int get_peer_idx(const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session)
Search peer in the list of peers in session.
struct GNUNET_SET_ListenHandle * GNUNET_SET_listen(const struct GNUNET_CONFIGURATION_Handle *cfg, enum GNUNET_SET_OperationType op_type, const struct GNUNET_HashCode *app_id, GNUNET_SET_ListenCallback listen_cb, void *listen_cls)
Wait for set operation requests for the given application ID.
Definition: set_api.c:1012
static struct GNUNET_SERVICE_Handle * service
Handle to our service instance.
unsigned int subordinates_cap
int GNUNET_CONTAINER_multihashmap_iterator_next(struct GNUNET_CONTAINER_MultiHashMapIterator *iter, struct GNUNET_HashCode *key, const void **value)
Retrieve the next element from the hash map at the iterator&#39;s position.
int GNUNET_SET_remove_element(struct GNUNET_SET_Handle *set, const struct GNUNET_SET_Element *element, GNUNET_SET_Continuation cont, void *cont_cls)
Remove an element to the given set.
Definition: set_api.c:729
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
static struct GNUNET_VPN_RedirectionRequest * request
Opaque redirection request handle.
Definition: gnunet-vpn.c:41
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_ntoh(struct GNUNET_TIME_AbsoluteNBO a)
Convert absolute time from network byte order.
Definition: time.c:671
static unsigned int phase
Processing stage that we are in.
Definition: gnunet-arm.c:109
static void client_insert_done(void *cls)
struct GNUNET_STATISTICS_Handle * statistics
Statistics handle.
static int check_client_join(void *cls, const struct GNUNET_CONSENSUS_JoinMessage *m)
Check join message.
static unsigned int element_size
static void try_finish_step_early(struct Step *step)
Weighted diff.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
int GNUNET_SET_add_element(struct GNUNET_SET_Handle *set, const struct GNUNET_SET_Element *element, GNUNET_SET_Continuation cont, void *cont_cls)
Add an element to the given set.
Definition: set_api.c:682
int * peer_commited
Stores, for every peer in the session, whether the peer finished the whole referendum.
If a value with the given key exists, replace it.
Consensus element, either marker or payload.
struct GNUNET_SET_ListenHandle * set_listener
Listener for requests from other peers.
static void shutdown_task(void *cls)
Called to clean up, after a shutdown has been requested.
uint16_t payload_type
Payload element_type, only valid if this is not a marker element.
static void commit_set(struct ConsensusSession *session, struct TaskEntry *task)
Commit the appropriate set for a task.
Handle for a set operation request from another peer.
Definition: set_api.c:113
static struct GNUNET_IDENTITY_EgoLookup * el
EgoLookup.
struct SetHandle * set_handles_head
static void * client_connect_cb(void *cls, struct GNUNET_SERVICE_Client *c, struct GNUNET_MQ_Handle *mq)
Callback called when a client connects to the service.
Handle to a service.
Definition: service.c:114
static void apply_diff_to_rfn(struct DiffEntry *diff, struct ReferendumEntry *rfn, uint16_t voting_peer, uint16_t num_peers)
int16_t repetition
Repetition of the gradecast phase.
static void construct_task_graph(struct ConsensusSession *session)
Element should be added to the result set of the remote peer, i.e.
static void task_start_reconcile(struct TaskEntry *task)
static void put_diff(struct ConsensusSession *session, struct DiffEntry *diff)
struct GNUNET_CONTAINER_MultiHashMapIterator * GNUNET_CONTAINER_multihashmap_iterator_create(const struct GNUNET_CONTAINER_MultiHashMap *map)
Create an iterator for a multihashmap.
struct ReferendumEntry * rfn_create(uint16_t size)
static void task_start_grade(struct TaskEntry *task)
static void rfn_contest(struct ReferendumEntry *rfn, uint16_t contested_peer)
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run on shutdown, that is when a CTRL-C signal is received, or when GNUNET_SCHEDULER_shutdown() is being invoked.
Definition: scheduler.c:1284
static void task_start_finish(struct TaskEntry *task)
Element stored in a set.
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
struct GNUNET_SET_Handle * GNUNET_SET_create(const struct GNUNET_CONFIGURATION_Handle *cfg, enum GNUNET_SET_OperationType op)
Create an empty set, supporting the specified operation.
Definition: set_api.c:652
uint16_t is_contested
Non-zero if this set reconciliation had elements removed because they were contested.
Element should be added to the result set of the local peer, i.e.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
struct GNUNET_SET_Handle * h
static struct ConsensusSession * sessions_tail
Linked list of sessions this peer participates in.
static const char * phasename(uint16_t phase)
struct GNUNET_CONTAINER_MultiHashMap * setmap
struct ConsensusSession * session
struct ConsensusSession * next
Consensus sessions are kept in a DLL.
struct DiffEntry * diff_compose(struct DiffEntry *diff_1, struct DiffEntry *diff_2)
static struct SetEntry * lookup_set(struct ConsensusSession *session, struct SetKey *key)
static struct ReferendumEntry * lookup_rfn(struct ConsensusSession *session, struct RfnKey *key)
static void arrange_peers(uint16_t *p1, uint16_t *p2, uint16_t n)
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
static void finish_task(struct TaskEntry *task)
static int peer_id_cmp(const void *h1, const void *h2)
Compare two peer identities.
#define GNUNET_NO
Definition: gnunet_common.h:78
unsigned int num_peers
Number of other peers in the consensus.
#define GNUNET_memdup(buf, size)
Allocate and initialize a block of memory.
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:75
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
unsigned int tasks_len
#define GNUNET_new(type)
Allocate a struct or union of the given type.
struct GNUNET_SET_OperationHandle * GNUNET_SET_prepare(const struct GNUNET_PeerIdentity *other_peer, const struct GNUNET_HashCode *app_id, const struct GNUNET_MessageHeader *context_msg, enum GNUNET_SET_ResultMode result_mode, struct GNUNET_SET_Option options[], GNUNET_SET_ResultIterator result_cb, void *result_cls)
Prepare a set operation to be evaluated with another peer.
Definition: set_api.c:808
static struct GNUNET_PeerIdentity my_peer
Peer that runs this service.
static void compute_global_id(struct ConsensusSession *session, const struct GNUNET_HashCode *local_session_id)
Compute a global, (hopefully) unique consensus session id, from the local id of the consensus session...
static struct GNUNET_SCHEDULER_Task * t
Main task.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
static struct TaskEntry * lookup_task(struct ConsensusSession *session, struct TaskKey *key)
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
struct GNUNET_TIME_AbsoluteNBO start
Start time for the consensus.
Definition: consensus.h:57
struct Step * next
All steps of one session are in a linked list for easier deallocation.
void GNUNET_SCHEDULER_shutdown(void)
Request the shutdown of a scheduler.
Definition: scheduler.c:517
static int ret
Final status code.
Definition: gnunet-arm.c:89
Handle for the service.
const struct GNUNET_SET_Element * element
GNUNET_SERVICE_MAIN("consensus", GNUNET_SERVICE_OPTION_NONE, &run, &client_connect_cb, &client_disconnect_cb, NULL, GNUNET_MQ_hd_fixed_size(client_conclude, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, struct GNUNET_MessageHeader, NULL), GNUNET_MQ_hd_var_size(client_insert, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, struct GNUNET_CONSENSUS_ElementMessage, NULL), GNUNET_MQ_hd_var_size(client_join, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, struct GNUNET_CONSENSUS_JoinMessage, NULL), GNUNET_MQ_handler_end())
Define "main" method using service macro.
Last result set from a gradecast.
#define GNUNET_strdup(a)
Wrapper around GNUNET_xstrdup_.
struct GNUNET_CONTAINER_MultiHashMap * taskmap
size_t pending_prereq
Counter for the prerequisites of this step.
static struct ConsensusSession * sessions_head
Linked list of sessions this peer participates in.
void GNUNET_SET_copy_lazy(struct GNUNET_SET_Handle *set, GNUNET_SET_CopyReadyCallback cb, void *cls)
Definition: set_api.c:1189
Success, all elements have been sent (and received).
struct GNUNET_HashCode session_id
Session id of the consensus.
Definition: consensus.h:52
Internal representation of the hash map.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur...
void GNUNET_SET_element_hash(const struct GNUNET_SET_Element *element, struct GNUNET_HashCode *ret_hash)
Hash a set element.
Definition: set_api.c:1241
struct SetKey input_set
#define GNUNET_NETWORK_STRUCT_BEGIN
Define as empty, GNUNET_PACKED should suffice, but this won&#39;t work on W32.
struct Step * prev
All steps of one session are in a linked list for easier deallocation.
const void * data
Actual data of the element.
Sent as context message for set reconciliation.
int * peers_blacklisted
Array of peers with length &#39;num_peers&#39;.
void * GNUNET_CONTAINER_multihashmap_get(const struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key)
Given a key find a value in the map matching the key.
static void set_copy_cb(void *cls, struct GNUNET_SET_Handle *copy)
Handle to a client that is connected to a service.
Definition: service.c:246
void GNUNET_SET_destroy(struct GNUNET_SET_Handle *set)
Destroy the set handle, and free all associated resources.
Definition: set_api.c:767
static struct DiffEntry * lookup_diff(struct ConsensusSession *session, struct DiffKey *key)
uint32_t num_peers
Number of peers (at the end of this message) that want to participate in the consensus.
Definition: consensus.h:47
struct ConsensusElement ce
static void construct_task_graph_gradecast(struct ConsensusSession *session, uint16_t rep, uint16_t lead, struct Step *step_before, struct Step *step_after)
Construct the task graph for a single gradecast.
static struct GNUNET_ARM_MonitorHandle * m
Monitor connection with ARM.
Definition: gnunet-arm.c:99
static void handle_client_join(void *cls, const struct GNUNET_CONSENSUS_JoinMessage *m)
Called when a client wants to join a consensus session.
static void finish_step(struct Step *step)
struct GNUNET_PeerIdentity * peers
int GNUNET_asprintf(char **buf, const char *format,...)
Like asprintf, just portable.
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct...
Definition: gnunet_mq_lib.h:52
struct GNUNET_TIME_Absolute conclude_start
Time when the conclusion of the consensus should begin.
static void create_set_copy_for_task(struct TaskEntry *task, struct SetKey *src_set_key, struct SetKey *dst_set_key)
Call the start function of the given task again after we created a copy of the given set...
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
void(* TaskFunc)(struct TaskEntry *task)
int16_t peer2
Number of the second peer in canonical order.
#define GNUNET_array_grow(arr, size, tsize)
Grow a well-typed (!) array.
static struct Step * create_step(struct ConsensusSession *session, int round, int early_finishable)
, &#39; bother checking if a value already exists (faster than GNUNET_CONTAINER_MULTIHASHMAPOPTION_...
void GNUNET_CRYPTO_hash_create_random(enum GNUNET_CRYPTO_Quality mode, struct GNUNET_HashCode *result)
Create a random hash code.
Definition: crypto_hash.c:138
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN.
Definition: consensus.h:41
struct Step ** subordinates
The other peer refused to to the operation with us, or something went wrong.
static int check_client_insert(void *cls, const struct GNUNET_CONSENSUS_ElementMessage *msg)
Called when a client performs an insert operation.
struct GNUNET_CONTAINER_MultiHashMap * changes
static void diff_insert(struct DiffEntry *diff, int weight, const struct GNUNET_SET_Element *element)
static void install_step_timeouts(struct ConsensusSession *session)
struct SetHandle * set_handles_tail
uint64_t lower_bound
Bounded Eppstein lower bound.
static void set_listen_cb(void *cls, const struct GNUNET_PeerIdentity *other_peer, const struct GNUNET_MessageHeader *context_msg, struct GNUNET_SET_Request *request)
Called when another peer wants to do a set operation with the local peer.
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
void GNUNET_CRYPTO_hash(const void *block, size_t size, struct GNUNET_HashCode *ret)
Compute hash of a given block.
Definition: crypto_hash.c:44
static void handle_client_conclude(void *cls, const struct GNUNET_MessageHeader *message)
Called when a client performs the conclude operation.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
struct TaskEntry * task
Task to finish once all changes are through.
GNUNET_SET_Status
Status for the result callback.
Message with an element.
Definition: consensus.h:71
static struct SolverHandle * sh
#define GNUNET_MIN(a, b)
Definition: gnunet_common.h:80
struct DiffKey key
Vote that nothing should change.
const char * GNUNET_STRINGS_relative_time_to_string(struct GNUNET_TIME_Relative delta, int do_round)
Give relative time in human-readable fancy format.
Definition: strings.c:686
struct SetKey dst_set_key
int weight
Positive weight for &#39;add&#39;, negative weights for &#39;remove&#39;.
uint16_t status
See PRISM_STATUS_*-constants.
static char buf[2048]
#define GNUNET_new_array(n, type)
Allocate a size n array with structs or unions of the given type.
static uint16_t task_other_peer(struct TaskEntry *task)
static void cleanup(void *cls)
Function scheduled as very last function, cleans up after us.
static const struct GNUNET_CONFIGURATION_Handle * cfg
Configuration of the consensus service.
p2p message definitions for consensus
struct SetKey input_set
struct GNUNET_MessageHeader header
Type: Either GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT or GNUNET_MESSAGE_TYPE_CONSENSUS_C...
Definition: consensus.h:77
static void client_disconnect_cb(void *cls, struct GNUNET_SERVICE_Client *c, void *internal_cls)
Callback called when a client disconnected from the service.
void GNUNET_STATISTICS_set(struct GNUNET_STATISTICS_Handle *handle, const char *name, uint64_t value, int make_persistent)
Set statistic value for the peer.
Apply a repetition of the all-to-all gradecast to the current set.
A 512-bit hashcode.
Client gets notified of the required changes for both the local and the remote set.
void GNUNET_SERVICE_client_drop(struct GNUNET_SERVICE_Client *c)
Ask the server to disconnect from the given client.
Definition: service.c:2315
struct ConsensusSession * prev
Consensus sessions are kept in a DLL.
char * debug_name
Human-readable name for the task, used for debugging.
Opaque handle to a set.
Definition: set_api.c:48
static const char * setname(uint16_t kind)
int GNUNET_CONFIGURATION_get_value_string(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, char **value)
Get a configuration value that should be a string.
static void task_start_eval_echo(struct TaskEntry *task)
struct TaskKey key
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:35
There must only be one value per key; storing a value should fail if a value under the same key alrea...
static uint16_t rfn_noncontested(struct ReferendumEntry *rfn)
struct TaskEntry ** tasks
Tasks that this step is composed of.
struct GNUNET_TESTBED_Peer * peer
The peer associated with this model.
struct GNUNET_HashCode key
The key used in the DHT.
int early_finishable
When we&#39;re doing an early finish, how should this step be treated? If GNUNET_YES, the step will be ma...
#define GNUNET_SYSERR
Definition: gnunet_common.h:76
static unsigned int size
Size of the "table".
Definition: peer.c:66
#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT
Sent by service when a new element is added.
static void step_depend_on(struct Step *step, struct Step *dep)
Record dep as a dependency of step.
struct GNUNET_CONTAINER_MultiHashMap * diffmap
int is_contested
GNUNET_YES if the set resulted from applying a referendum with contested elements.
static void initialize_session_peer_list(struct ConsensusSession *session, const struct GNUNET_CONSENSUS_JoinMessage *join_msg)
Create the sorted list of peers for the session, add the local peer if not in the join message...
struct GNUNET_HashCode global_id
Global consensus identification, computed from the session id and participating authorities.
#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE
Sent by client to service in order to start the consensus conclusion.
void GNUNET_SET_listen_cancel(struct GNUNET_SET_ListenHandle *lh)
Cancel the given listen operation.
Definition: set_api.c:1046
#define GNUNET_CONTAINER_DLL_insert_tail(head, tail, element)
Insert an element at the tail of a DLL.
struct GNUNET_MQ_Handle * client_mq
Queued messages to the client.
Option for set operations.
int GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
Sent by the client to the service, when the client wants the service to join a consensus session...
Definition: consensus.h:37
struct DiffEntry * diff_create()
static unsigned int num_peers
#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT
Insert an element.
static void put_rfn(struct ConsensusSession *session, struct ReferendumEntry *rfn)
#define GNUNET_memcmp(a, b)
Compare memory in a and b, where both must be of the same pointer type.
unsigned int is_finished
struct DiffKey output_diff
int16_t leader
Leader in the gradecast phase.
Handle to an operation.
Definition: set_api.c:132
Handle to a message queue.
Definition: mq.c:84
A consensus session consists of one local client and the remote authorities.
Tuple of integers that together identify a task uniquely.
static void run_ready_steps(struct ConsensusSession *session)
#define GNUNET_NETWORK_STRUCT_END
Define as empty, GNUNET_PACKED should suffice, but this won&#39;t work on W32;.
unsigned int round
struct GNUNET_HashCode rand
#define GNUNET_MQ_msg_header(type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
Definition: gnunet_mq_lib.h:76
static struct GNUNET_FS_PublishContext * pc
Handle to FS-publishing operation.
The identity of the host (wraps the signing key of the peer).
int GNUNET_SET_commit(struct GNUNET_SET_OperationHandle *oh, struct GNUNET_SET_Handle *set)
Commit a set to be used with a set operation.
Definition: set_api.c:1123
static const char * diffname(uint16_t kind)
uint16_t kind
A value from &#39;enum PhaseKind&#39;.
uint8_t marker
Is this a marker element?
Vote that an element should be added.
static void task_cancel_reconcile(struct TaskEntry *task)
static void set_result_cb(void *cls, const struct GNUNET_SET_Element *element, uint64_t current_size, enum GNUNET_SET_Status status)
Callback for set operation results.
#define GNUNET_PACKED
gcc-ism to get packed structs.
unsigned int finished_tasks
configuration data
Definition: configuration.c:83
struct GNUNET_MessageHeader header
Type: GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT.
struct GNUNET_SET_Handle * h
int GNUNET_CRYPTO_hash_cmp(const struct GNUNET_HashCode *h1, const struct GNUNET_HashCode *h2)
Compare function for HashCodes, producing a total ordering of all hashcodes.
Definition: crypto_hash.c:278
struct SetHandle * next
uint16_t element_type
Type: GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_NEW_ELEMENT.
Definition: consensus.h:82
Block type for consensus elements.
#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE
Sent by service to client in order to signal a completed consensus conclusion.
int16_t leader
Leader in the gradecast phase.
uint16_t size
Number of bytes in the buffer pointed to by data.
struct Step * step
struct GNUNET_MQ_Handle * mq
Definition: 003.c:5
const struct GNUNET_SET_Element * element
#define GNUNET_log(kind,...)
Entry in list of pending tasks.
Definition: scheduler.c:131
unsigned int local_peer_idx
Index of the local peer in the peers array.
unsigned int is_running
#define GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN
Join a consensus session.
Opaque handle to a listen operation.
Definition: set_api.c:182
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
enum ReferendumVote proposal
Proposal for this element, can only be VOTE_ADD or VOTE_REMOVE.
int GNUNET_CRYPTO_get_peer_identity(const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_PeerIdentity *dst)
Retrieve the identity of the host&#39;s peer.
struct SetOpCls setop
unsigned int num_client_insert_pending
static const char * rfnname(uint16_t kind)
static void run(void *cls, const struct GNUNET_CONFIGURATION_Handle *c, struct GNUNET_SERVICE_Handle *service)
Start processing consensus requests.
struct GNUNET_TIME_Absolute conclude_deadline
Timeout for all rounds together, single rounds will schedule a timeout task with a fraction of the co...
void GNUNET_SET_operation_cancel(struct GNUNET_SET_OperationHandle *oh)
Cancel the given set operation.
Definition: set_api.c:511
static void handle_client_insert(void *cls, const struct GNUNET_CONSENSUS_ElementMessage *msg)
Called when a client performs an insert operation.
static int send_to_client_iter(void *cls, const struct GNUNET_SET_Element *element)
Send the final result set of the consensus to the client, element by element.
struct GNUNET_SET_Element * GNUNET_SET_element_dup(const struct GNUNET_SET_Element *element)
Create a copy of an element.
Definition: set_api.c:1218
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
struct GNUNET_SET_OperationHandle * op
Header for all communications.
struct GNUNET_SERVICE_Client * client
Client that inhabits the session.
Time for absolute times used by GNUnet, in microseconds.
#define GNUNET_YES
Definition: gnunet_common.h:77
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_difference(struct GNUNET_TIME_Absolute start, struct GNUNET_TIME_Absolute end)
Compute the time difference between the given start and end times.
Definition: time.c:353
static void rfn_commit(struct ReferendumEntry *rfn, uint16_t commit_peer)
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:351
struct RfnKey output_rfn
static void set_mutation_done(void *cls)
union TaskFuncCls cls
struct SetHandle * prev
static void rfn_majority(const struct ReferendumEntry *rfn, const struct RfnElementInfo *ri, uint16_t *ret_majority, enum ReferendumVote *ret_vote)
For a given majority, count what the outcome is (add/remove/keep), and give the number of peers that ...
unsigned int tasks_cap
int16_t peer1
Number of the first peer in canonical order.
struct ConsensusElement ce
static void start_task(struct ConsensusSession *session, struct TaskEntry *task)
Set union, return all elements that are in at least one of the sets.
int GNUNET_SET_iterate(struct GNUNET_SET_Handle *set, GNUNET_SET_ElementIterator iter, void *iter_cls)
Iterate over all elements in the given set.
Definition: set_api.c:1167
unsigned int subordinates_len
struct SetKey output_set
static void task_start_apply_round(struct TaskEntry *task)
Apply the result from one round of gradecasts (i.e.
static void put_set(struct ConsensusSession *session, struct SetEntry *set)
struct GNUNET_CONTAINER_MultiHashMap * rfn_elements
int GNUNET_CRYPTO_kdf(void *result, size_t out_len, const void *xts, size_t xts_len, const void *skm, size_t skm_len,...)
Derive key.
Definition: crypto_kdf.c:91
void GNUNET_SERVICE_client_continue(struct GNUNET_SERVICE_Client *c)
Continue receiving further messages from the given client.
Definition: service.c:2234
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
struct GNUNET_TIME_AbsoluteNBO deadline
Deadline for conclude.
Definition: consensus.h:62
int early_stopping
State of our early stopping scheme.
No good quality of the operation is needed (i.e., random numbers can be pseudo-random).
static void rfn_vote(struct ReferendumEntry *rfn, uint16_t voting_peer, enum ReferendumVote vote, const struct GNUNET_SET_Element *element)
static void put_task(struct GNUNET_CONTAINER_MultiHashMap *taskmap, struct TaskEntry *t)
#define GNUNET_malloc(size)
Wrapper around malloc.
int16_t peer1
Number of the first peer in canonical order.
struct SetKey key
int16_t peer2
Number of the second peer in canonical order.
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:48
struct TaskEntry * task
#define GNUNET_free(ptr)
Wrapper around free.
uint16_t element_type
Application-specific element type.
int16_t repetition
Repetition of the gradecast phase.
struct GNUNET_SET_OperationHandle * GNUNET_SET_accept(struct GNUNET_SET_Request *request, enum GNUNET_SET_ResultMode result_mode, struct GNUNET_SET_Option options[], GNUNET_SET_ResultIterator result_cb, void *result_cls)
Accept a request we got via GNUNET_SET_listen().
Definition: set_api.c:1080
struct GNUNET_SCHEDULER_Task * timeout_task
Fail set operations when the other peer shows weird behavior that might by a Byzantine fault...
uint16_t kind
A value from &#39;enum PhaseKind&#39;.