GNUnet  0.10.x
rps_api.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C)
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "rps.h"
29 #include "gnunet_rps_service.h"
30 #include "rps-sampler_client.h"
31 
32 #include "gnunet_nse_service.h"
33 
34 #include <inttypes.h>
35 
36 #define LOG(kind, ...) GNUNET_log_from(kind, "rps-api", __VA_ARGS__)
37 
46 
51 
55  void *ready_cb_cls;
56 
61 
66 
71 };
72 
73 
82 
87 
92 
97 
102 
107 
112 
117 
122 
127 
132 
137 
143 
151 };
152 
153 
162 
166  uint32_t num_requests;
167 
172 
177 
183 
188 
193 
198 
203 };
204 
205 
214 
219 
224 
230 
235 
240 
245 
250 };
251 
252 
257 struct cb_cls_pack {
262 
266  void *cls;
267 
271  struct GNUNET_CLIENT_Connection *service_conn;
272 };
273 
274 
280 
285 static uint64_t srh_callback_num_peers;
286 
287 
298 static struct GNUNET_RPS_StreamRequestHandle *
301  void *cls)
302 {
303  struct GNUNET_RPS_StreamRequestHandle *srh;
304 
306  srh->rps_handle = rps_handle;
307  srh->ready_cb = ready_cb;
308  srh->ready_cb_cls = cls;
310  rps_handle->stream_requests_tail,
311  srh);
312 
313  return srh;
314 }
315 
316 
322 static void
324 {
325  struct GNUNET_RPS_Handle *rps_handle = srh->rps_handle;
326 
327  GNUNET_assert(NULL != srh);
328  if (NULL != srh->callback_task)
329  {
331  srh->callback_task = NULL;
332  }
334  rps_handle->stream_requests_tail,
335  srh);
336  GNUNET_free(srh);
337 }
338 
339 
349 static void
351  uint32_t num_peers,
352  void *cls)
353 {
354  struct GNUNET_RPS_Request_Handle *rh = cls;
355 
356  rh->sampler_rh = NULL;
357  rh->ready_cb(rh->ready_cb_cls,
358  num_peers,
359  peers);
361 }
362 
363 
375 static void
377  void *cls,
378  double probability,
379  uint32_t num_observed)
380 {
381  struct GNUNET_RPS_Request_Handle_Single_Info *rh = cls;
382 
383  rh->sampler_rh = NULL;
384  rh->ready_cb(rh->ready_cb_cls,
385  peers,
386  probability,
387  num_observed);
389 }
390 
391 
400 static void
402  uint64_t num_peers,
403  const struct GNUNET_PeerIdentity *peers)
404 {
405  struct GNUNET_RPS_Request_Handle *rh = cls;
406 
408  "Service sent %" PRIu64 " peers from stream\n",
409  num_peers);
410  for (uint64_t i = 0; i < num_peers; i++)
411  {
412  RPS_sampler_update(rh->sampler, &peers[i]);
413  }
414 }
415 
416 
427 static void
429  uint64_t num_peers,
430  const struct GNUNET_PeerIdentity *peers)
431 {
432  struct GNUNET_RPS_Request_Handle_Single_Info *rhs = cls;
433 
435  "Service sent %" PRIu64 " peers from stream\n",
436  num_peers);
437  for (uint64_t i = 0; i < num_peers; i++)
438  {
439  RPS_sampler_update(rhs->sampler, &peers[i]);
440  }
441 }
442 
443 
444 /* Get internals for debugging/profiling purposes */
445 
455 void
457  uint32_t num_updates,
459  void *cls)
460 {
461  struct GNUNET_MQ_Envelope *ev;
463 
465  "Client requests %" PRIu32 " view updates\n",
466  num_updates);
467  rps_handle->view_update_cb = view_update_cb;
468  rps_handle->view_update_cls = cls;
469 
471  msg->num_updates = htonl(num_updates);
472  GNUNET_MQ_send(rps_handle->mq, ev);
473 }
474 
475 
476 void
478 {
479  struct GNUNET_MQ_Envelope *ev;
480 
481  GNUNET_assert(NULL != rps_handle->view_update_cb);
482 
483  rps_handle->view_update_cb = NULL;
484 
486  GNUNET_MQ_send(rps_handle->mq, ev);
487 }
488 
489 
499  GNUNET_RPS_NotifyReadyCB stream_input_cb,
500  void *cls)
501 {
502  struct GNUNET_RPS_StreamRequestHandle *srh;
503  struct GNUNET_MQ_Envelope *ev;
505 
506  srh = new_stream_request(rps_handle,
507  stream_input_cb,
508  cls);
509  LOG(GNUNET_ERROR_TYPE_DEBUG, "Client requests biased stream updates\n");
510 
512  GNUNET_MQ_send(rps_handle->mq, ev);
513  return srh;
514 }
515 
516 
525 static int
527  const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
528 {
529  uint16_t msize = ntohs(msg->header.size);
530  uint32_t num_peers = ntohl(msg->num_peers);
531 
532  (void)cls;
533 
534  msize -= sizeof(struct GNUNET_RPS_CS_DEBUG_ViewReply);
535  if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
536  (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
537  {
538  GNUNET_break(0);
539  return GNUNET_SYSERR;
540  }
541  return GNUNET_OK;
542 }
543 
544 
552 static void
554  const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
555 {
556  struct GNUNET_RPS_Handle *h = cls;
557  struct GNUNET_PeerIdentity *peers;
558 
559  /* Give the peers back */
561  "New view of %" PRIu32 " peers:\n",
562  ntohl(msg->num_peers));
563 
564  peers = (struct GNUNET_PeerIdentity *)&msg[1];
565  GNUNET_assert(NULL != h);
566  GNUNET_assert(NULL != h->view_update_cb);
567  h->view_update_cb(h->view_update_cls, ntohl(msg->num_peers), peers);
568 }
569 
570 
577 static void
579 {
580  struct GNUNET_MQ_Envelope *ev;
581 
583  GNUNET_MQ_send(rps_handle->mq, ev);
584 }
585 
586 
592 void
594 {
596 
597  rps_handle = srh->rps_handle;
599  if (NULL == rps_handle->stream_requests_head)
600  cancel_stream(rps_handle);
601 }
602 
603 
614 static int
616  const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
617 {
618  uint16_t msize = ntohs(msg->header.size);
619  uint32_t num_peers = ntohl(msg->num_peers);
620 
621  (void)cls;
622 
623  msize -= sizeof(struct GNUNET_RPS_CS_DEBUG_StreamReply);
624  if ((msize / sizeof(struct GNUNET_PeerIdentity) != num_peers) ||
625  (msize % sizeof(struct GNUNET_PeerIdentity) != 0))
626  {
627  GNUNET_break(0);
628  return GNUNET_SYSERR;
629  }
630  return GNUNET_OK;
631 }
632 
633 
639 static void
641 {
642  struct GNUNET_RPS_StreamRequestHandle *srh = cls;
643 
644  srh->callback_task = NULL;
645  srh->ready_cb(srh->ready_cb_cls,
647  srh_callback_peers);
648 }
649 
650 
659 static void
661  const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
662 {
663  struct GNUNET_RPS_Handle *h = cls;
664  //const struct GNUNET_PeerIdentity *peers;
665  uint64_t num_peers;
666  struct GNUNET_RPS_StreamRequestHandle *srh_iter;
667  struct GNUNET_RPS_StreamRequestHandle *srh_next;
668 
669  //peers = (struct GNUNET_PeerIdentity *) &msg[1];
670  num_peers = ntohl(msg->num_peers);
672  GNUNET_free_non_null(srh_callback_peers);
673  srh_callback_peers = GNUNET_new_array(num_peers,
674  struct GNUNET_PeerIdentity);
675  GNUNET_memcpy(srh_callback_peers,
676  &msg[1],
677  num_peers * sizeof(struct GNUNET_PeerIdentity));
679  "Received %" PRIu64 " peer(s) from stream input.\n",
680  num_peers);
681  for (srh_iter = h->stream_requests_head;
682  NULL != srh_iter;
683  srh_iter = srh_next)
684  {
685  LOG(GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n");
686  /* Store next pointer - srh might be removed/freed in callback */
687  srh_next = srh_iter->next;
688  if (NULL != srh_iter->callback_task)
690  srh_iter->callback_task =
692  srh_iter);
693  }
694 
695  if (NULL == h->stream_requests_head)
696  {
697  cancel_stream(h);
698  }
699 }
700 
701 
705 static void
706 reconnect(struct GNUNET_RPS_Handle *h);
707 
708 
718 static void
720  enum GNUNET_MQ_Error error)
721 {
722  struct GNUNET_RPS_Handle *h = cls;
723 
724  //TODO LOG
725  LOG(GNUNET_ERROR_TYPE_WARNING, "Problem with message queue. error: %i\n\
726  1: READ,\n\
727  2: WRITE,\n\
728  4: TIMEOUT\n",
729  // TODO: write GNUNET_MQ_strerror (error)
730  error);
731  reconnect(h);
732  /* Resend all pending request as the service destroyed its knowledge
733  * about them */
734 }
735 
736 
744 static void
745 hash_from_share_val(const char *share_val,
746  struct GNUNET_HashCode *hash)
747 {
748  GNUNET_CRYPTO_kdf(hash,
749  sizeof(struct GNUNET_HashCode),
750  "rps",
751  strlen("rps"),
752  share_val,
753  strlen(share_val),
754  NULL, 0);
755 }
756 
757 
769 static void
770 nse_cb(void *cls,
771  struct GNUNET_TIME_Absolute timestamp,
772  double logestimate,
773  double std_dev)
774 {
775  struct GNUNET_RPS_Handle *h = cls;
776 
777  (void)timestamp;
778  (void)std_dev;
779 
780  for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
781  NULL != rh_iter && NULL != rh_iter->next;
782  rh_iter = rh_iter->next)
783  {
784  RPS_sampler_update_with_nw_size(rh_iter->sampler,
785  GNUNET_NSE_log_estimate_to_n(logestimate));
786  }
787  for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
788  NULL != rhs_iter && NULL != rhs_iter->next;
789  rhs_iter = rhs_iter->next)
790  {
791  RPS_sampler_update_with_nw_size(rhs_iter->sampler,
792  GNUNET_NSE_log_estimate_to_n(logestimate));
793  }
794 }
795 
796 
800 static void
802 {
803  struct GNUNET_MQ_MessageHandler mq_handlers[] = {
807  h),
811  h),
813  };
814 
815  if (NULL != h->mq)
816  GNUNET_MQ_destroy(h->mq);
817  h->mq = GNUNET_CLIENT_connect(h->cfg,
818  "rps",
819  mq_handlers,
821  h);
822  if (NULL != h->nse)
824  h->nse = GNUNET_NSE_connect(h->cfg, &nse_cb, h);
825 }
826 
827 
834 struct GNUNET_RPS_Handle *
836 {
837  struct GNUNET_RPS_Handle *h;
838 
839  h = GNUNET_new(struct GNUNET_RPS_Handle);
840  h->cfg = cfg;
841  if (GNUNET_OK !=
843  "RPS",
844  "DESIRED_PROBABILITY",
845  &h->desired_probability))
846  {
848  "RPS", "DESIRED_PROBABILITY");
849  GNUNET_free(h);
850  return NULL;
851  }
852  if (0 > h->desired_probability ||
853  1 < h->desired_probability)
854  {
856  "The desired probability must be in the interval [0;1]\n");
857  GNUNET_free(h);
858  return NULL;
859  }
860  if (GNUNET_OK !=
862  "RPS",
863  "DEFICIENCY_FACTOR",
864  &h->deficiency_factor))
865  {
867  "RPS", "DEFICIENCY_FACTOR");
868  GNUNET_free(h);
869  return NULL;
870  }
871  if (0 > h->desired_probability ||
872  1 < h->desired_probability)
873  {
875  "The deficiency factor must be in the interval [0;1]\n");
876  GNUNET_free(h);
877  return NULL;
878  }
879  reconnect(h);
880  if (NULL == h->mq)
881  {
882  GNUNET_free(h);
883  return NULL;
884  }
885  return h;
886 }
887 
888 
895 void
897  const char *shared_value)
898 {
900  struct GNUNET_MQ_Envelope *ev;
901 
903  hash_from_share_val(shared_value, &msg->hash);
904  msg->round_interval = GNUNET_TIME_relative_hton( // TODO read from config!
907 
908  GNUNET_MQ_send(h->mq, ev);
909 }
910 
911 
918 void
920  const char *shared_value)
921 {
923  struct GNUNET_MQ_Envelope *ev;
924 
926  hash_from_share_val(shared_value, &msg->hash);
927 
928  GNUNET_MQ_send(h->mq, ev);
929 }
930 
931 
943  uint32_t num_req_peers,
945  void *cls)
946 {
947  struct GNUNET_RPS_Request_Handle *rh;
948 
950  "Client requested %" PRIu32 " peers\n",
951  num_req_peers);
953  rh->rps_handle = rps_handle;
954  rh->num_requests = num_req_peers;
955  rh->sampler = RPS_sampler_mod_init(num_req_peers,
956  GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
958  rps_handle->desired_probability);
960  rps_handle->deficiency_factor);
962  num_req_peers,
964  rh);
965  rh->srh = GNUNET_RPS_stream_request(rps_handle,
967  rh); /* cls */
968  rh->ready_cb = ready_cb;
969  rh->ready_cb_cls = cls;
971  rps_handle->rh_tail,
972  rh);
973 
974  return rh;
975 }
976 
977 
989  void *cls)
990 {
992  uint32_t num_req_peers = 1;
993 
995  "Client requested peer with additional info\n");
997  rhs->rps_handle = rps_handle;
998  rhs->sampler = RPS_sampler_mod_init(num_req_peers,
999  GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff
1001  rps_handle->desired_probability);
1003  rps_handle->deficiency_factor);
1006  rhs);
1007  rhs->srh = GNUNET_RPS_stream_request(rps_handle,
1009  rhs); /* cls */
1010  rhs->ready_cb = ready_cb;
1011  rhs->ready_cb_cls = cls;
1013  rps_handle->rhs_tail,
1014  rhs);
1015 
1016  return rhs;
1017 }
1018 
1019 
1027 void
1029  uint32_t n,
1030  const struct GNUNET_PeerIdentity *ids)
1031 {
1032  size_t size_needed;
1033  uint32_t num_peers_max;
1034  const struct GNUNET_PeerIdentity *tmp_peer_pointer;
1035  struct GNUNET_MQ_Envelope *ev;
1037 
1039  "Client wants to seed %" PRIu32 " peers:\n",
1040  n);
1041  for (unsigned int i = 0; i < n; i++)
1043  "%u. peer: %s\n",
1044  i,
1045  GNUNET_i2s(&ids[i]));
1046 
1047  /* The actual size the message occupies */
1048  size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage) +
1049  n * sizeof(struct GNUNET_PeerIdentity);
1050  /* The number of peers that fits in one message together with
1051  * the respective header */
1052  num_peers_max = (GNUNET_MAX_MESSAGE_SIZE -
1053  sizeof(struct GNUNET_RPS_CS_SeedMessage)) /
1054  sizeof(struct GNUNET_PeerIdentity);
1055  tmp_peer_pointer = ids;
1056 
1057  while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
1058  {
1059  ev = GNUNET_MQ_msg_extra(msg,
1060  num_peers_max * sizeof(struct GNUNET_PeerIdentity),
1062  msg->num_peers = htonl(num_peers_max);
1063  GNUNET_memcpy(&msg[1],
1064  tmp_peer_pointer,
1065  num_peers_max * sizeof(struct GNUNET_PeerIdentity));
1066  GNUNET_MQ_send(h->mq,
1067  ev);
1068  n -= num_peers_max;
1069  size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage) +
1070  n * sizeof(struct GNUNET_PeerIdentity);
1071  /* Set pointer to beginning of next block of num_peers_max peers */
1072  tmp_peer_pointer = &ids[num_peers_max];
1073  }
1074 
1075  ev = GNUNET_MQ_msg_extra(msg,
1076  n * sizeof(struct GNUNET_PeerIdentity),
1078  msg->num_peers = htonl(n);
1079  GNUNET_memcpy(&msg[1],
1080  tmp_peer_pointer,
1081  n * sizeof(struct GNUNET_PeerIdentity));
1082  GNUNET_MQ_send(h->mq,
1083  ev);
1084 }
1085 
1086 
1087 #if ENABLE_MALICIOUS
1088 
1102 void
1103 GNUNET_RPS_act_malicious(struct GNUNET_RPS_Handle *h,
1104  uint32_t type,
1105  uint32_t num_peers,
1106  const struct GNUNET_PeerIdentity *peer_ids,
1107  const struct GNUNET_PeerIdentity *target_peer)
1108 {
1109  size_t size_needed;
1110  uint32_t num_peers_max;
1111  const struct GNUNET_PeerIdentity *tmp_peer_pointer;
1112  struct GNUNET_MQ_Envelope *ev;
1113  struct GNUNET_RPS_CS_ActMaliciousMessage *msg;
1114 
1115  unsigned int i;
1116 
1118  "Client turns malicious (type %" PRIu32 ") with %" PRIu32 " other peers:\n",
1119  type,
1120  num_peers);
1121  for (i = 0; i < num_peers; i++)
1123  "%u. peer: %s\n",
1124  i,
1125  GNUNET_i2s(&peer_ids[i]));
1126 
1127  /* The actual size the message would occupy */
1128  size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage) +
1129  num_peers * sizeof(struct GNUNET_PeerIdentity);
1130  /* The number of peers that fit in one message together with
1131  * the respective header */
1132  num_peers_max = (GNUNET_MAX_MESSAGE_SIZE -
1133  sizeof(struct GNUNET_RPS_CS_SeedMessage)) /
1134  sizeof(struct GNUNET_PeerIdentity);
1135  tmp_peer_pointer = peer_ids;
1136 
1137  while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
1138  {
1140  "Too many peers to send at once, sending %" PRIu32 " (all we can so far)\n",
1141  num_peers_max);
1142  ev = GNUNET_MQ_msg_extra(msg,
1143  num_peers_max * sizeof(struct GNUNET_PeerIdentity),
1144  GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
1145  msg->type = htonl(type);
1146  msg->num_peers = htonl(num_peers_max);
1147  if ((2 == type) ||
1148  (3 == type))
1149  msg->attacked_peer = peer_ids[num_peers];
1150  GNUNET_memcpy(&msg[1],
1151  tmp_peer_pointer,
1152  num_peers_max * sizeof(struct GNUNET_PeerIdentity));
1153 
1154  GNUNET_MQ_send(h->mq, ev);
1155 
1156  num_peers -= num_peers_max;
1157  size_needed = sizeof(struct GNUNET_RPS_CS_SeedMessage) +
1158  num_peers * sizeof(struct GNUNET_PeerIdentity);
1159  /* Set pointer to beginning of next block of num_peers_max peers */
1160  tmp_peer_pointer = &peer_ids[num_peers_max];
1161  }
1162 
1163  ev = GNUNET_MQ_msg_extra(msg,
1164  num_peers * sizeof(struct GNUNET_PeerIdentity),
1165  GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS);
1166  msg->type = htonl(type);
1167  msg->num_peers = htonl(num_peers);
1168  if ((2 == type) ||
1169  (3 == type))
1170  msg->attacked_peer = *target_peer;
1171  GNUNET_memcpy(&msg[1],
1172  tmp_peer_pointer,
1173  num_peers * sizeof(struct GNUNET_PeerIdentity));
1174 
1175  GNUNET_MQ_send(h->mq, ev);
1176 }
1177 #endif /* ENABLE_MALICIOUS */
1178 
1179 
1185 void
1187 {
1188  struct GNUNET_RPS_Handle *h;
1189 
1190  h = rh->rps_handle;
1191  GNUNET_assert(NULL != rh);
1192  GNUNET_assert(NULL != rh->srh);
1193  GNUNET_assert(h == rh->srh->rps_handle);
1195  rh->srh = NULL;
1196  if (NULL == h->stream_requests_head)
1197  cancel_stream(h);
1198  if (NULL != rh->sampler_rh)
1199  {
1201  }
1203  rh->sampler = NULL;
1205  h->rh_tail,
1206  rh);
1207  GNUNET_free(rh);
1208 }
1209 
1210 
1216 void
1219 {
1220  struct GNUNET_RPS_Handle *h;
1221 
1222  h = rhs->rps_handle;
1223  GNUNET_assert(NULL != rhs);
1224  GNUNET_assert(NULL != rhs->srh);
1225  GNUNET_assert(h == rhs->srh->rps_handle);
1227  rhs->srh = NULL;
1228  if (NULL == h->stream_requests_head)
1229  cancel_stream(h);
1230  if (NULL != rhs->sampler_rh)
1231  {
1233  }
1235  rhs->sampler = NULL;
1237  h->rhs_tail,
1238  rhs);
1239  GNUNET_free(rhs);
1240 }
1241 
1242 
1248 void
1250 {
1251  if (NULL != h->stream_requests_head)
1252  {
1253  struct GNUNET_RPS_StreamRequestHandle *srh_next;
1254 
1256  "Still waiting for replies\n");
1257  for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head;
1258  NULL != srh_iter;
1259  srh_iter = srh_next)
1260  {
1261  srh_next = srh_iter->next;
1262  GNUNET_RPS_stream_cancel(srh_iter);
1263  }
1264  }
1265  if (NULL != h->rh_head)
1266  {
1268  "Not all requests were cancelled!\n");
1269  for (struct GNUNET_RPS_Request_Handle *rh_iter = h->rh_head;
1270  h->rh_head != NULL;
1271  rh_iter = h->rh_head)
1272  {
1273  GNUNET_RPS_request_cancel(rh_iter);
1274  }
1275  }
1276  if (NULL != h->rhs_head)
1277  {
1279  "Not all requests were cancelled!\n");
1280  for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head;
1281  h->rhs_head != NULL;
1282  rhs_iter = h->rhs_head)
1283  {
1285  }
1286  }
1287  if (NULL != srh_callback_peers)
1288  {
1289  GNUNET_free(srh_callback_peers);
1290  srh_callback_peers = NULL;
1291  }
1292  if (NULL != h->view_update_cb)
1293  {
1295  "Still waiting for view updates\n");
1297  }
1298  if (NULL != h->nse)
1300  GNUNET_MQ_destroy(h->mq);
1301  GNUNET_free(h);
1302 }
1303 
1304 
1305 /* end of rps_api.c */
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
struct GNUNET_RPS_Request_Handle_Single_Info * rhs_head
Pointer to the head element in DLL of single request handles.
Definition: rps_api.c:131
#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY
Send peer of biased stream.
static void collect_peers_cb(void *cls, uint64_t num_peers, const struct GNUNET_PeerIdentity *peers)
Callback to collect the peers from the biased stream and put those into the sampler.
Definition: rps_api.c:401
#define GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START
RPS client-service message to start a sub sampler.
struct GNUNET_RPS_Request_Handle * GNUNET_RPS_request_peers(struct GNUNET_RPS_Handle *rps_handle, uint32_t num_req_peers, GNUNET_RPS_NotifyReadyCB ready_cb, void *cls)
Request n random peers.
Definition: rps_api.c:942
Handle for a request to get peers from biased stream of ids.
Definition: rps_api.c:41
Message from service to client containing current update of view.
Definition: rps.h:180
struct GNUNET_RPS_Request_Handle * rh_tail
Pointer to the tail element in DLL of request handles.
Definition: rps_api.c:126
struct GNUNET_MessageHeader header
Header including size and type in NBO.
Definition: rps.h:216
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
static void nse_cb(void *cls, struct GNUNET_TIME_Absolute timestamp, double logestimate, double std_dev)
Callback for network size estimate - called with new estimates about the network size, updates all samplers with the new estimate.
Definition: rps_api.c:770
Message from client to service indicating that clients wants to get stream of biased peers...
Definition: rps.h:202
struct GNUNET_RPS_StreamRequestHandle * srh
Request handle of the request of the biased stream of peers - needed to cancel the request...
Definition: rps_api.c:182
static void reconnect(struct GNUNET_RPS_Handle *h)
Reconnect to the service.
Definition: rps_api.c:801
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
GNUNET_RPS_NotifyReadyCB ready_cb
The callback to be called when we receive an answer.
Definition: rps_api.c:187
uint32_t num_updates
Number of updates 0 for sending updates until cancellation.
Definition: rps.h:174
struct GNUNET_MQ_Handle * GNUNET_CLIENT_connect(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *service_name, const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_ErrorHandler error_handler, void *error_handler_cls)
Create a message queue to connect to a GNUnet service.
Definition: client.c:900
void GNUNET_RPS_seed_ids(struct GNUNET_RPS_Handle *h, uint32_t n, const struct GNUNET_PeerIdentity *ids)
Seed rps service with peerIDs.
Definition: rps_api.c:1028
struct RPS_Sampler * sampler
The Sampler for the client request.
Definition: rps_api.c:171
Message from client to service indicating that clients wants to get updates of the view...
Definition: rps.h:164
GNUNET_MQ_Error
Error codes for the queue.
struct GNUNET_RPS_Request_Handle * rh_head
Pointer to the head element in DLL of request handles.
Definition: rps_api.c:121
#define GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP
RPS client-service message to stop a sub sampler.
void GNUNET_RPS_view_request(struct GNUNET_RPS_Handle *rps_handle, uint32_t num_updates, GNUNET_RPS_NotifyReadyCB view_update_cb, void *cls)
Request updates of view.
Definition: rps_api.c:456
#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST
Request updates of the view.
void RPS_sampler_set_deficiency_factor(struct RPS_Sampler *sampler, double deficiency_factor)
Set the deficiency factor.
GNUNET_RPS_NotifyReadyCB ready_cb
The callback to be called when we receive an answer.
Definition: rps_api.c:50
client sampler implementation
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
void RPS_sampler_set_desired_probability(struct RPS_Sampler *sampler, double desired_probability)
Set the probability that is needed at least with what a sampler element has to have observed all elem...
#define GNUNET_TIME_UNIT_SECONDS
One second.
struct GNUNET_NSE_Handle * GNUNET_NSE_connect(const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_NSE_Callback func, void *func_cls)
Connect to the network size estimation service.
Definition: nse_api.c:164
void(* GNUNET_RPS_NotifyReadyCB)(void *cls, uint64_t num_peers, const struct GNUNET_PeerIdentity *peers)
Callback called when requested random peers are available.
uint64_t rel_value_us__
The actual value (in network byte order).
struct GNUNET_RPS_Request_Handle_Single_Info * next
Pointer to next element in DLL.
Definition: rps_api.c:244
static int stream_input
Do we want to receive updates of the view? (Option –view)
Definition: gnunet-rps.c:56
void RPS_sampler_request_single_info_cancel(struct RPS_SamplerRequestHandleSingleInfo *req_single_info_handle)
Cancle a request issued through RPS_sampler_sinlge_info_ready_cb.
float deficiency_factor
A factor that catches the &#39;bias&#39; of a random stream of peer ids.
Definition: rps_api.c:150
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
int GNUNET_CONFIGURATION_get_value_float(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, float *number)
Get a configuration value that should be a floating point number.
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:67
void RPS_sampler_update(struct RPS_Sampler *sampler, const struct GNUNET_PeerIdentity *id)
Update every sampler element of this sampler with given peer.
void GNUNET_RPS_stream_cancel(struct GNUNET_RPS_StreamRequestHandle *srh)
Cancel a specific request for updates from the biased peer stream.
Definition: rps_api.c:593
static void peers_ready_cb(const struct GNUNET_PeerIdentity *peers, uint32_t num_peers, void *cls)
Called once the sampler has collected all requested peers.
Definition: rps_api.c:350
void * ready_cb_cls
The closure for the callback.
Definition: rps_api.c:239
static void cancel_stream(struct GNUNET_RPS_Handle *rps_handle)
Send message to service that this client does not want to receive further updates from the biased pee...
Definition: rps_api.c:578
static void remove_stream_request(struct GNUNET_RPS_StreamRequestHandle *srh)
Remove the given stream request from the list of requests and memory.
Definition: rps_api.c:323
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:75
static void collect_peers_info_cb(void *cls, uint64_t num_peers, const struct GNUNET_PeerIdentity *peers)
Callback to collect the peers from the biased stream and put those into the sampler.
Definition: rps_api.c:428
#define GNUNET_free_non_null(ptr)
Free the memory pointed to by ptr if ptr is not NULL.
#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY
Send update of the view.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
example IPC messages between RPS API and GNS service
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
void GNUNET_RPS_disconnect(struct GNUNET_RPS_Handle *h)
Disconnect from the rps service.
Definition: rps_api.c:1249
static void handle_view_update(void *cls, const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
This function is called, when the service updated its view.
Definition: rps_api.c:553
Handler to handle requests from a client.
Definition: rps_api.c:77
struct GNUNET_RPS_Handle * rps_handle
The client issuing the request.
Definition: rps_api.c:213
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur...
GNUNET_RPS_NotifyReadyCB cb
Callback provided by the client.
Definition: rps_api.c:261
static struct GNUNET_ARM_Handle * h
Connection with ARM.
Definition: gnunet-arm.c:94
void GNUNET_RPS_sub_stop(struct GNUNET_RPS_Handle *h, const char *shared_value)
Stop a sub with the given shared value.
Definition: rps_api.c:919
static void srh_callback_scheduled(void *cls)
Called by the scheduler to call the callbacks of the srh handlers.
Definition: rps_api.c:640
Message from client to service with seed of peers.
Definition: rps.h:66
Closure to _get_n_rand_peers_ready_cb()
struct GNUNET_RPS_Handle * GNUNET_RPS_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the rps service.
Definition: rps_api.c:835
void * cls
Closure provided by the client.
Definition: rps_api.c:266
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct...
Definition: gnunet_mq_lib.h:52
struct RPS_Sampler * RPS_sampler_mod_init(size_t init_size, struct GNUNET_TIME_Relative max_round_interval)
Initialise a modified tuple of sampler elements.
struct GNUNET_CLIENT_Connection * service_conn
Handle to the service connection.
Definition: rps_api.c:271
GNUNET_RPS_NotifyReadyCB view_update_cb
Callback called on each update of the view.
Definition: rps_api.c:91
uint64_t num_peers
Number of peers.
Definition: rps.h:221
#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL
Cancel getting updates of the view.
struct GNUNET_RPS_Request_Handle * prev
Pointer to previous element in DLL.
Definition: rps_api.c:202
void GNUNET_log_config_missing(enum GNUNET_ErrorType kind, const char *section, const char *option)
Log error message about missing configuration option.
void GNUNET_RPS_request_single_info_cancel(struct GNUNET_RPS_Request_Handle_Single_Info *rhs)
Cancle an issued single info request.
Definition: rps_api.c:1217
struct GNUNET_MQ_Handle * mq
The message queue to the client.
Definition: rps_api.c:86
static struct GNUNET_PeerIdentity * peer_ids
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
struct GNUNET_RPS_Handle * rps_handle
The client issuing the request.
Definition: rps_api.c:161
static struct GNUNET_CONTAINER_MultiPeerMap * ids
GNUNET_PeerIdentity -> CadetPeer.
void view_update_cb(void *cls, uint64_t view_size, const struct GNUNET_PeerIdentity *peers)
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_now(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run as soon as possible.
Definition: scheduler.c:1264
struct GNUNET_RPS_Request_Handle * next
Pointer to next element in DLL.
Definition: rps_api.c:197
static struct GNUNET_RPS_StreamRequestHandle * new_stream_request(struct GNUNET_RPS_Handle *rps_handle, GNUNET_RPS_NotifyReadyCB ready_cb, void *cls)
Create a new handle for a stream request.
Definition: rps_api.c:299
struct GNUNET_HashCode hash
Length of the shared value represented as string.
Definition: rps.h:154
static void peer_info_ready_cb(const struct GNUNET_PeerIdentity *peers, void *cls, double probability, uint32_t num_observed)
Called once the sampler has collected the requested peer.
Definition: rps_api.c:376
struct GNUNET_RPS_StreamRequestHandle * stream_requests_tail
Tail of the DLL of stream requests.
Definition: rps_api.c:111
#define GNUNET_new_array(n, type)
Allocate a size n array with structs or unions of the given type.
Message from client to service telling it to stop a new sub.
Definition: rps.h:145
void RPS_sampler_destroy(struct RPS_Sampler *sampler)
Cleans the samplers.
struct GNUNET_TIME_Relative GNUNET_TIME_relative_multiply(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Multiply relative time by a given factor.
Definition: time.c:440
struct RPS_SamplerRequestHandle * RPS_sampler_get_n_rand_peers(struct RPS_Sampler *sampler, uint32_t num_peers, RPS_sampler_n_rand_peers_ready_cb cb, void *cls)
Get n random peers out of the sampled peers.
void * stream_input_cls
Closure to each requested peer from the biased stream.
Definition: rps_api.c:101
Closure to _get_rand_peer_info()
struct GNUNET_RPS_Request_Handle_Single_Info * GNUNET_RPS_request_peer_info(struct GNUNET_RPS_Handle *rps_handle, GNUNET_RPS_NotifyReadySingleInfoCB ready_cb, void *cls)
Request one random peer, getting additional information.
Definition: rps_api.c:987
struct RPS_SamplerRequestHandle * sampler_rh
Request handle of the request to the sampler - needed to cancel the request.
Definition: rps_api.c:176
struct GNUNET_RPS_StreamRequestHandle * stream_requests_head
Head of the DLL of stream requests.
Definition: rps_api.c:106
#define GNUNET_MESSAGE_TYPE_RPS_CS_SEED
RPS CS SEED Message for the Client to seed peers into rps.
void RPS_sampler_update_with_nw_size(struct RPS_Sampler *sampler, uint32_t num_peers)
Update the current estimate of the network size stored at the sampler.
A 512-bit hashcode.
Message handler for a specific message type.
static int view_update
Do we want to receive updates of the view? (Option –view)
Definition: gnunet-rps.c:51
struct RPS_SamplerRequestHandleSingleInfo * RPS_sampler_get_rand_peer_info(struct RPS_Sampler *sampler, RPS_sampler_sinlge_info_ready_cb cb, void *cls)
Get one random peer with additional information.
Handler for a single request from a client.
Definition: rps_api.c:209
Message from service to client containing peer from biased stream.
Definition: rps.h:212
#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL
Cancel getting biased strem.
void GNUNET_NSE_disconnect(struct GNUNET_NSE_Handle *h)
Disconnect from network size estimation service.
Definition: nse_api.c:192
static struct GNUNET_CONFIGURATION_Handle * cfg
Our configuration.
Definition: gnunet-arm.c:104
void * ready_cb_cls
The closure for the callback.
Definition: rps_api.c:192
Message from client to service telling it to start a new sub.
Definition: rps.h:119
uint32_t num_requests
The number of requested peers.
Definition: rps_api.c:166
Handle for talking with the NSE service.
Definition: nse_api.c:40
#define GNUNET_SYSERR
Definition: gnunet_common.h:76
static void hash_from_share_val(const char *share_val, struct GNUNET_HashCode *hash)
Create the hash value from the share value that defines the sub (-group)
Definition: rps_api.c:745
struct GNUNET_TIME_RelativeNBO GNUNET_TIME_relative_hton(struct GNUNET_TIME_Relative a)
Convert relative time to network byte order.
Definition: time.c:623
struct GNUNET_RPS_StreamRequestHandle * srh
Request handle of the request of the biased stream of peers - needed to cancel the request...
Definition: rps_api.c:229
uint32_t num_peers
Number of peers.
Definition: rps.h:75
static unsigned int num_peers
static struct GNUNET_PeerIdentity * target_peer
ID of the targeted peer.
struct RPS_SamplerRequestHandleSingleInfo * sampler_rh
Request handle of the request to the sampler - needed to cancel the request.
Definition: rps_api.c:223
#define GNUNET_MAX_MESSAGE_SIZE
Largest supported message (to be precise, one byte more than the largest possible message...
struct GNUNET_MessageHeader header
Header including size and type in NBO.
Definition: rps.h:184
Handle to a message queue.
Definition: mq.c:84
#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST
Request biased input stream.
GNUNET_RPS_NotifyReadySingleInfoCB ready_cb
The callback to be called when we receive an answer.
Definition: rps_api.c:234
struct GNUNET_TIME_RelativeNBO round_interval
Mean interval between two rounds.
Definition: rps.h:133
struct GNUNET_SCHEDULER_Task * callback_task
Scheduler task for scheduled callback.
Definition: rps_api.c:60
void GNUNET_RPS_view_request_cancel(struct GNUNET_RPS_Handle *rps_handle)
Definition: rps_api.c:477
#define GNUNET_MQ_msg_header(type)
Allocate a GNUNET_MQ_Envelope, where the message only consists of a header.
Definition: gnunet_mq_lib.h:76
The identity of the host (wraps the signing key of the peer).
const struct GNUNET_CONFIGURATION_Handle * cfg
The handle to the client configuration.
Definition: rps_api.c:81
struct RPS_Sampler * sampler
The Sampler for the client request.
Definition: rps_api.c:218
Struct used to pack the callback, its closure (provided by the caller) and the connection handler to ...
Definition: rps_api.c:257
void GNUNET_RPS_request_cancel(struct GNUNET_RPS_Request_Handle *rh)
Cancle an issued request.
Definition: rps_api.c:1186
configuration data
Definition: configuration.c:83
struct GNUNET_NSE_Handle * nse
Handle to nse service.
Definition: rps_api.c:116
static struct CadetPeer * peers
Operation to get peer ids.
static struct GNUNET_PeerIdentity * srh_callback_peers
Peers received from the biased stream to be passed to all srh_handlers.
Definition: rps_api.c:279
void * view_update_cls
Closure to each requested update of the view.
Definition: rps_api.c:96
struct GNUNET_RPS_StreamRequestHandle * prev
Previous element of the DLL.
Definition: rps_api.c:70
struct GNUNET_RPS_Request_Handle_Single_Info * rhs_tail
Pointer to the tail element in DLL of single request handles.
Definition: rps_api.c:136
Entry in list of pending tasks.
Definition: scheduler.c:131
struct GNUNET_RPS_StreamRequestHandle * GNUNET_RPS_stream_request(struct GNUNET_RPS_Handle *rps_handle, GNUNET_RPS_NotifyReadyCB stream_input_cb, void *cls)
Request biased stream of peers that are being put into the sampler.
Definition: rps_api.c:498
struct GNUNET_RPS_Request_Handle_Single_Info * prev
Pointer to previous element in DLL.
Definition: rps_api.c:249
void GNUNET_RPS_sub_start(struct GNUNET_RPS_Handle *h, const char *shared_value)
Start a sub with the given shared value.
Definition: rps_api.c:896
static void mq_error_handler(void *cls, enum GNUNET_MQ_Error error)
Error handler for mq.
Definition: rps_api.c:719
static void handle_stream_input(void *cls, const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
This function is called, when the service sends another peer from the biased stream.
Definition: rps_api.c:660
void * ready_cb_cls
The closure for the callback.
Definition: rps_api.c:55
struct GNUNET_RPS_StreamRequestHandle * next
Next element of the DLL.
Definition: rps_api.c:65
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
Handler for a single request from a client.
Definition: rps_api.c:157
float desired_probability
The desired probability with which we want to have observed all peers.
Definition: rps_api.c:142
Time for absolute times used by GNUnet, in microseconds.
void GNUNET_MQ_destroy(struct GNUNET_MQ_Handle *mq)
Destroy the message queue.
Definition: mq.c:821
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:351
void RPS_sampler_request_cancel(struct RPS_SamplerRequestHandle *req_handle)
Cancle a request issued through RPS_sampler_n_rand_peers_ready_cb.
uint64_t num_peers
Number of peers in the view.
Definition: rps.h:194
#define GNUNET_NSE_log_estimate_to_n(loge)
Convert the logarithmic estimated returned to the &#39;GNUNET_NSE_Callback&#39; into an absolute estimate in ...
static int check_stream_input(void *cls, const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
This function is called, when the service sends another peer from the biased stream.
Definition: rps_api.c:615
int GNUNET_CRYPTO_kdf(void *result, size_t out_len, const void *xts, size_t xts_len, const void *skm, size_t skm_len,...)
Derive key.
Definition: crypto_kdf.c:91
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
Sampler with its own array of SamplerElements.
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
static uint64_t srh_callback_num_peers
Number of peers in the biased stream that are to be passed to all srh_handlers.
Definition: rps_api.c:285
void(* GNUNET_RPS_NotifyReadySingleInfoCB)(void *cls, const struct GNUNET_PeerIdentity *peer, double probability, uint32_t num_observed)
Callback called when requested random peer with additional information is available.
#define GNUNET_free(ptr)
Wrapper around free.
struct GNUNET_RPS_Handle * rps_handle
The client issuing the request.
Definition: rps_api.c:45
#define LOG(kind,...)
Definition: rps_api.c:36
static int check_view_update(void *cls, const struct GNUNET_RPS_CS_DEBUG_ViewReply *msg)
This function is called, when the service updates the view.
Definition: rps_api.c:526
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:956
struct GNUNET_HashCode hash
Length of the shared value represented as string.
Definition: rps.h:138