GNUnet  0.11.x
gnunet-service-fs_pr.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2009-2013 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_load_lib.h"
29 #include "gnunet-service-fs.h"
30 #include "gnunet-service-fs_cp.h"
32 #include "gnunet-service-fs_pe.h"
33 #include "gnunet-service-fs_pr.h"
35 
36 
40 #define DHT_GET_REPLICATION 5
41 
47 #define MAX_DATASTORE_QUEUE (16 * MAX_QUEUE_PER_PEER)
48 
55 #define CONTENT_BANDWIDTH_VALUE 800
56 
60 #define MAX_RESULTS (100 * 1024)
61 
65 #define INSANE_STATISTICS GNUNET_NO
66 
71 #define CADET_RETRY_MAX 3
72 
73 
78 {
83 
88 
92  void *rh_cls;
93 
98 
103 
108 
113 
118 
123 
129 
134 
139 
145 
151 
156 
161 
166 
170  bool seen_null;
171 
176  uint64_t first_uid;
177 
181  size_t result_count;
182 
187  unsigned int cadet_retry_count;
188 
192  unsigned int replies_seen_count;
193 
197  unsigned int replies_seen_size;
198 };
199 
200 
206 
207 
212 
213 
218 
219 
229 
230 
236 static unsigned long long max_pending_requests = (32 * 1024);
237 
238 
249 static void
251 {
252  if (NULL != pr->bg)
253  {
255  pr->bg = NULL;
256  }
257  if (GNUNET_BLOCK_TYPE_FS_UBLOCK != type)
258  return; /* no need */
259  pr->bg =
261  type,
264  UINT32_MAX),
265  NULL,
266  0,
267  "seen-set-size",
268  pr->replies_seen_count,
269  NULL);
270  if (NULL == pr->bg)
271  return;
274  pr->replies_seen,
275  pr->replies_seen_count));
276 }
277 
278 
300 struct GSF_PendingRequest *
302  enum GNUNET_BLOCK_Type type,
303  const struct GNUNET_HashCode *query,
304  const struct GNUNET_PeerIdentity *target,
305  const char *bf_data,
306  size_t bf_size,
307  uint32_t mingle,
308  uint32_t anonymity_level,
309  uint32_t priority,
310  int32_t ttl,
313  const struct GNUNET_HashCode *replies_seen,
314  unsigned int replies_seen_count,
316  void *rh_cls)
317 {
318  struct GSF_PendingRequest *pr;
319  struct GSF_PendingRequest *dpr;
320  size_t extra;
321  struct GNUNET_HashCode *eptr;
322 
324  "Creating request handle for `%s' of type %d\n",
325  GNUNET_h2s (query),
326  type);
327 #if INSANE_STATISTICS
329  gettext_noop ("# Pending requests created"),
330  1,
331  GNUNET_NO);
332 #endif
333  extra = 0;
334  if (NULL != target)
335  extra += sizeof(struct GNUNET_PeerIdentity);
336  pr = GNUNET_malloc (sizeof(struct GSF_PendingRequest) + extra);
337  pr->public_data.query = *query;
338  eptr = (struct GNUNET_HashCode *) &pr[1];
339  if (NULL != target)
340  {
341  pr->public_data.target = (struct GNUNET_PeerIdentity *) eptr;
342  GNUNET_memcpy (eptr, target, sizeof(struct GNUNET_PeerIdentity));
343  }
345  pr->public_data.priority = priority;
346  pr->public_data.original_priority = priority;
348  pr->public_data.type = type;
350  pr->sender_pid = sender_pid;
351  pr->origin_pid = origin_pid;
352  pr->rh = rh;
353  pr->rh_cls = rh_cls;
354  GNUNET_assert ((sender_pid != 0) || (0 == (options & GSF_PRO_FORWARD_ONLY)));
355  if (ttl >= 0)
358  else
362  (uint32_t) (-ttl)));
363  if (replies_seen_count > 0)
364  {
366  pr->replies_seen =
369  replies_seen,
370  replies_seen_count * sizeof(struct GNUNET_HashCode));
372  }
373  if ((NULL != bf_data) &&
375  {
377  pr->public_data.type,
378  mingle,
379  bf_data,
380  bf_size,
381  "seen-set-size",
382  0,
383  NULL);
384  }
385  else if ((replies_seen_count > 0) &&
386  (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)))
387  {
389  }
391  &pr->public_data.query,
392  pr,
394  if (0 == (options & GSF_PRO_REQUEST_NEVER_EXPIRES))
395  {
396  pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
397  pr,
399  /* make sure we don't track too many requests */
400  while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) >
402  {
403  dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
404  GNUNET_assert (NULL != dpr);
405  if (pr == dpr)
406  break; /* let the request live briefly... */
407  if (NULL != dpr->rh)
408  dpr->rh (dpr->rh_cls,
410  dpr,
411  UINT32_MAX,
415  NULL,
416  0);
418  }
419  }
421  gettext_noop ("# Pending requests active"),
422  1,
423  GNUNET_NO);
424  return pr;
425 }
426 
427 
434 struct GSF_PendingRequestData *
436 {
437  return &pr->public_data;
438 }
439 
440 
450 int
452  struct GSF_PendingRequest *prb)
453 {
454  if ((pra->public_data.type != prb->public_data.type) ||
455  (0 != memcmp (&pra->public_data.query,
456  &prb->public_data.query,
457  sizeof(struct GNUNET_HashCode))))
458  return GNUNET_NO;
459  return GNUNET_OK;
460 }
461 
462 
471 void
473  const struct GNUNET_HashCode *replies_seen,
474  unsigned int replies_seen_count)
475 {
476  if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
477  return; /* integer overflow */
479  {
480  /* we're responsible for the BF, full refresh */
481  if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
483  pr->replies_seen_size,
484  replies_seen_count + pr->replies_seen_count);
486  replies_seen,
487  sizeof(struct GNUNET_HashCode) * replies_seen_count);
490  }
491  else
492  {
493  if (NULL == pr->bg)
494  {
495  /* we're not the initiator, but the initiator did not give us
496  * any bloom-filter, so we need to create one on-the-fly */
498  }
499  else
500  {
503  replies_seen,
504  pr->replies_seen_count));
505  }
506  }
507  if (NULL != pr->gh)
509  replies_seen_count,
510  replies_seen);
511 }
512 
513 
521 struct GNUNET_MQ_Envelope *
523 {
524  struct GNUNET_MQ_Envelope *env;
525  struct GetMessage *gm;
526  struct GNUNET_PeerIdentity *ext;
527  unsigned int k;
528  uint32_t bm;
529  uint32_t prio;
530  size_t bf_size;
531  struct GNUNET_TIME_Absolute now;
532  int64_t ttl;
533  int do_route;
534  void *bf_data;
535  uint32_t bf_nonce;
536 
538  "Building request message for `%s' of type %d\n",
540  pr->public_data.type);
541  k = 0;
542  bm = 0;
543  do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
544  if ((! do_route) && (pr->sender_pid == 0))
545  {
546  GNUNET_break (0);
547  do_route = GNUNET_YES;
548  }
549  if (! do_route)
550  {
552  k++;
553  }
554  if (NULL != pr->public_data.target)
555  {
557  k++;
558  }
559  if (GNUNET_OK !=
560  GNUNET_BLOCK_group_serialize (pr->bg, &bf_nonce, &bf_data, &bf_size))
561  {
562  bf_size = 0;
563  bf_data = NULL;
564  }
565  env = GNUNET_MQ_msg_extra (gm,
566  bf_size + k * sizeof(struct GNUNET_PeerIdentity),
568  gm->type = htonl (pr->public_data.type);
569  if (do_route)
571  pr->public_data.priority + 1);
572  else
573  prio = 0;
574  pr->public_data.priority -= prio;
576  pr->public_data.respect_offered += prio;
577  gm->priority = htonl (prio);
578  now = GNUNET_TIME_absolute_get ();
579  ttl = (int64_t) (pr->public_data.ttl.abs_value_us - now.abs_value_us);
580  gm->ttl = htonl (ttl / 1000LL / 1000LL);
581  gm->filter_mutator = htonl (bf_nonce);
582  gm->hash_bitmap = htonl (bm);
583  gm->query = pr->public_data.query;
584  ext = (struct GNUNET_PeerIdentity *) &gm[1];
585  k = 0;
586  if (! do_route)
587  GNUNET_PEER_resolve (pr->sender_pid, &ext[k++]);
588  if (NULL != pr->public_data.target)
589  ext[k++] = *pr->public_data.target;
590  GNUNET_memcpy (&ext[k], bf_data, bf_size);
591  GNUNET_free_non_null (bf_data);
592  return env;
593 }
594 
595 
604 static int
605 clean_request (void *cls, const struct GNUNET_HashCode *key, void *value)
606 {
607  struct GSF_PendingRequest *pr = value;
609 
611  "Cleaning up pending request for `%s'.\n",
612  GNUNET_h2s (key));
613  if (NULL != pr->cadet_request)
614  {
617  pr->cadet_request = NULL;
618  }
619  if (NULL != (cont = pr->llc_cont))
620  {
621  pr->llc_cont = NULL;
622  cont (pr->llc_cont_cls, pr, pr->local_result);
623  }
627  pr->bg = NULL;
629  pr->sender_pid = 0;
631  pr->origin_pid = 0;
632  if (NULL != pr->hnode)
633  {
635  pr->hnode = NULL;
636  }
637  if (NULL != pr->qe)
638  {
640  pr->qe = NULL;
641  }
642  if (NULL != pr->gh)
643  {
644  GNUNET_DHT_get_stop (pr->gh);
645  pr->gh = NULL;
646  }
647  if (NULL != pr->warn_task)
648  {
650  pr->warn_task = NULL;
651  }
652  GNUNET_assert (
653  GNUNET_OK ==
656  gettext_noop ("# Pending requests active"),
657  -1,
658  GNUNET_NO);
659  GNUNET_free (pr);
660  return GNUNET_YES;
661 }
662 
663 
670 void
671 GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup)
672 {
674 
675  if (NULL == pr_map)
676  return; /* already cleaned up! */
677  if (GNUNET_NO == full_cleanup)
678  {
679  /* make request inactive (we're no longer interested in more results),
680  * but do NOT remove from our data-structures, we still need it there
681  * to prevent the request from looping */
682  pr->rh = NULL;
683  if (NULL != pr->cadet_request)
684  {
687  pr->cadet_request = NULL;
688  }
689  if (NULL != (cont = pr->llc_cont))
690  {
691  pr->llc_cont = NULL;
692  cont (pr->llc_cont_cls, pr, pr->local_result);
693  }
695  if (NULL != pr->qe)
696  {
698  pr->qe = NULL;
699  }
700  if (NULL != pr->gh)
701  {
702  GNUNET_DHT_get_stop (pr->gh);
703  pr->gh = NULL;
704  }
705  if (NULL != pr->warn_task)
706  {
708  pr->warn_task = NULL;
709  }
710  return;
711  }
713  clean_request (NULL, &pr->public_data.query, pr));
714 }
715 
716 
723 void
725 {
727  pr_map,
729  cls);
730 }
731 
732 
737 {
741  const void *data;
742 
747 
752 
756  size_t size;
757 
762 
767 
771  uint32_t priority;
772 
776  uint32_t anonymity_level;
777 
782 
787 };
788 
789 
797 static void
799  struct GSF_PendingRequest *pr)
800 {
801  if (prq->sender == NULL)
802  return;
805  prq->priority);
806 }
807 
808 
817 static int
818 process_reply (void *cls, const struct GNUNET_HashCode *key, void *value)
819 {
820  struct ProcessReplyClosure *prq = cls;
821  struct GSF_PendingRequest *pr = value;
822  struct GNUNET_HashCode chash;
823  struct GNUNET_TIME_Absolute last_transmission;
824 
825  if (NULL == pr->rh)
826  return GNUNET_YES;
828  "Matched result (type %u) for query `%s' with pending request\n",
829  (unsigned int) prq->type,
830  GNUNET_h2s (key));
832  gettext_noop ("# replies received and matched"),
833  1,
834  GNUNET_NO);
836  prq->type,
837  pr->bg,
838  prq->eo,
839  key,
840  NULL,
841  0,
842  prq->data,
843  prq->size);
844  switch (prq->eval)
845  {
848  break;
849 
851  /* short cut: stop processing early, no BF-update, etc. */
856  .rel_value_us);
857  if (GNUNET_YES !=
859  .pr_head,
860  prq->sender,
861  &last_transmission))
862  last_transmission.abs_value_us =
863  GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us;
864  /* pass on to other peers / local clients */
865  pr->rh (pr->rh_cls,
866  prq->eval,
867  pr,
868  prq->anonymity_level,
869  prq->expiration,
870  last_transmission,
871  prq->type,
872  prq->data,
873  prq->size);
874  return GNUNET_YES;
875 
877 #if INSANE_STATISTICS
879  gettext_noop (
880  "# duplicate replies discarded (bloomfilter)"),
881  1,
882  GNUNET_NO);
883 #endif
884  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Duplicate response, discarding.\n");
885  return GNUNET_YES; /* duplicate */
886 
889  gettext_noop ("# irrelevant replies discarded"),
890  1,
891  GNUNET_NO);
892  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Irrelevant response, ignoring.\n");
893  return GNUNET_YES;
894 
896  return GNUNET_YES; /* wrong namespace */
897 
899  GNUNET_break (0);
900  return GNUNET_YES;
901 
903  GNUNET_break (0);
904  return GNUNET_YES;
905 
908  _ ("Unsupported block type %u\n"),
909  prq->type);
910  return GNUNET_NO;
911  }
912  /* update bloomfilter */
913  GNUNET_CRYPTO_hash (prq->data, prq->size, &chash);
914  GSF_pending_request_update_ (pr, &chash, 1);
915  if (NULL == prq->sender)
916  {
918  "Found result for query `%s' in local datastore\n",
919  GNUNET_h2s (key));
921  gettext_noop ("# results found locally"),
922  1,
923  GNUNET_NO);
924  }
925  else
926  {
927  GSF_dht_lookup_ (pr);
928  }
930  pr->public_data.priority = 0;
933  prq->request_found = GNUNET_YES;
934  /* finally, pass on to other peer / local client */
936  .pr_head,
937  prq->sender,
938  &last_transmission))
939  last_transmission.abs_value_us = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us;
940  pr->rh (pr->rh_cls,
941  prq->eval,
942  pr,
943  prq->anonymity_level,
944  prq->expiration,
945  last_transmission,
946  prq->type,
947  prq->data,
948  prq->size);
949  return GNUNET_YES;
950 }
951 
952 
957 {
962 
966  struct GNUNET_PeerIdentity origin;
967 
973 };
974 
975 
985 static void
987  int success,
989  const char *msg)
990 {
991  struct PutMigrationContext *pmc = cls;
992  struct GSF_ConnectedPeer *cp;
993  struct GNUNET_TIME_Relative mig_pause;
994  struct GSF_PeerPerformanceData *ppd;
995 
996  if (NULL != datastore_put_load)
997  {
998  if (GNUNET_SYSERR != success)
999  {
1000  GNUNET_LOAD_update (datastore_put_load,
1002  .rel_value_us);
1003  }
1004  else
1005  {
1006  /* on queue failure / timeout, increase the put load dramatically */
1007  GNUNET_LOAD_update (datastore_put_load,
1008  GNUNET_TIME_UNIT_MINUTES.rel_value_us);
1009  }
1010  }
1011  cp = GSF_peer_get_ (&pmc->origin);
1012  if (GNUNET_OK == success)
1013  {
1014  if (NULL != cp)
1015  {
1016  ppd = GSF_get_peer_performance_data_ (cp);
1017  ppd->migration_delay.rel_value_us /= 2;
1018  }
1019  GNUNET_free (pmc);
1020  return;
1021  }
1022  if ((GNUNET_NO == success) && (GNUNET_NO == pmc->requested) && (NULL != cp))
1023  {
1024  ppd = GSF_get_peer_performance_data_ (cp);
1025  if (min_expiration.abs_value_us > 0)
1026  {
1028  "Asking to stop migration for %s because datastore is full\n",
1030  GNUNET_TIME_absolute_get_remaining (min_expiration),
1031  GNUNET_YES));
1032  GSF_block_peer_migration_ (cp, min_expiration);
1033  }
1034  else
1035  {
1037  ppd->migration_delay);
1038  ppd->migration_delay =
1040  mig_pause.rel_value_us =
1043  ppd->migration_delay =
1045  GNUNET_log (
1047  "Replicated content already exists locally, asking to stop migration for %s\n",
1050  GNUNET_TIME_relative_to_absolute (mig_pause));
1051  }
1052  }
1053  GNUNET_free (pmc);
1055  gettext_noop ("# Datastore `PUT' failures"),
1056  1,
1057  GNUNET_NO);
1058 }
1059 
1060 
1070 static int
1071 test_put_load_too_high (uint32_t priority)
1072 {
1073  double ld;
1074 
1075  if (NULL == datastore_put_load)
1076  return GNUNET_NO;
1077  if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
1078  return GNUNET_NO; /* very fast */
1079  ld = GNUNET_LOAD_get_load (datastore_put_load);
1080  if (ld < 2.0 * (1 + priority))
1081  return GNUNET_NO;
1083  gettext_noop (
1084  "# storage requests dropped due to high load"),
1085  1,
1086  GNUNET_NO);
1087  return GNUNET_YES;
1088 }
1089 
1090 
1106 static void
1107 handle_dht_reply (void *cls,
1108  struct GNUNET_TIME_Absolute exp,
1109  const struct GNUNET_HashCode *key,
1110  const struct GNUNET_PeerIdentity *get_path,
1111  unsigned int get_path_length,
1112  const struct GNUNET_PeerIdentity *put_path,
1113  unsigned int put_path_length,
1114  enum GNUNET_BLOCK_Type type,
1115  size_t size,
1116  const void *data)
1117 {
1118  struct GSF_PendingRequest *pr = cls;
1119  struct ProcessReplyClosure prq;
1120  struct PutMigrationContext *pmc;
1121 
1123  gettext_noop ("# Replies received from DHT"),
1124  1,
1125  GNUNET_NO);
1126  memset (&prq, 0, sizeof(prq));
1127  prq.data = data;
1128  prq.expiration = exp;
1129  /* do not allow migrated content to live longer than 1 year */
1132  prq.expiration);
1133  prq.size = size;
1134  prq.type = type;
1135  prq.eo = GNUNET_BLOCK_EO_NONE;
1136  process_reply (&prq, key, pr);
1137  if ((GNUNET_YES == active_to_migration) &&
1139  {
1141  "Replicating result for query `%s' with priority %u\n",
1142  GNUNET_h2s (key),
1143  prq.priority);
1144  pmc = GNUNET_new (struct PutMigrationContext);
1145  pmc->start = GNUNET_TIME_absolute_get ();
1146  pmc->requested = GNUNET_YES;
1147  if (NULL == GNUNET_DATASTORE_put (GSF_dsh,
1148  0,
1149  key,
1150  size,
1151  data,
1152  type,
1153  prq.priority,
1154  1 /* anonymity */,
1155  0 /* replication */,
1156  exp,
1157  1 + prq.priority,
1160  pmc))
1161  {
1163  GNUNET_SYSERR,
1165  NULL);
1166  }
1167  }
1168 }
1169 
1170 
1176 void
1178 {
1179  const void *xquery;
1180  size_t xquery_size;
1181  struct GNUNET_PeerIdentity pi;
1182  char buf[sizeof(struct GNUNET_HashCode) * 2] GNUNET_ALIGN;
1183 
1184  if (0 != pr->public_data.anonymity_level)
1185  return;
1186  if (NULL != pr->gh)
1187  {
1188  GNUNET_DHT_get_stop (pr->gh);
1189  pr->gh = NULL;
1190  }
1191  xquery = NULL;
1192  xquery_size = 0;
1193  if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
1194  {
1195  GNUNET_assert (0 != pr->sender_pid);
1196  GNUNET_PEER_resolve (pr->sender_pid, &pi);
1197  GNUNET_memcpy (&buf[xquery_size], &pi, sizeof(struct GNUNET_PeerIdentity));
1198  xquery_size += sizeof(struct GNUNET_PeerIdentity);
1199  }
1201  pr->public_data.type,
1202  &pr->public_data.query,
1205  xquery,
1206  xquery_size,
1208  pr);
1209  if ((NULL != pr->gh) && (0 != pr->replies_seen_count))
1211  pr->replies_seen_count,
1212  pr->replies_seen);
1213 }
1214 
1215 
1225 static void
1226 cadet_reply_proc (void *cls,
1227  enum GNUNET_BLOCK_Type type,
1229  size_t data_size,
1230  const void *data)
1231 {
1232  struct GSF_PendingRequest *pr = cls;
1233  struct ProcessReplyClosure prq;
1234  struct GNUNET_HashCode query;
1235 
1236  pr->cadet_request = NULL;
1237  if (GNUNET_BLOCK_TYPE_ANY == type)
1238  {
1239  GNUNET_break (NULL == data);
1240  GNUNET_break (0 == data_size);
1241  pr->cadet_retry_count++;
1243  return; /* give up on cadet */
1244  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Error retrieiving block via cadet\n");
1245  /* retry -- without delay, as this is non-anonymous
1246  and cadet/cadet connect will take some time anyway */
1248  &pr->public_data.query,
1249  pr->public_data.type,
1251  pr);
1252  return;
1253  }
1254  if (GNUNET_YES !=
1255  GNUNET_BLOCK_get_key (GSF_block_ctx, type, data, data_size, &query))
1256  {
1258  "Failed to derive key for block of type %d\n",
1259  (int) type);
1260  GNUNET_break_op (0);
1261  return;
1262  }
1264  gettext_noop ("# Replies received from CADET"),
1265  1,
1266  GNUNET_NO);
1267  memset (&prq, 0, sizeof(prq));
1268  prq.data = data;
1269  prq.expiration = expiration;
1270  /* do not allow migrated content to live longer than 1 year */
1273  prq.expiration);
1274  prq.size = data_size;
1275  prq.type = type;
1276  prq.eo = GNUNET_BLOCK_EO_NONE;
1277  process_reply (&prq, &query, pr);
1278 }
1279 
1280 
1286 void
1288 {
1289  if (0 != pr->public_data.anonymity_level)
1290  return;
1291  if (0 == pr->public_data.target)
1292  {
1294  "Cannot do cadet-based download, target peer not known\n");
1295  return;
1296  }
1297  if (NULL != pr->cadet_request)
1298  return;
1300  &pr->public_data.query,
1301  pr->public_data.type,
1303  pr);
1304 }
1305 
1306 
1312 static void
1313 warn_delay_task (void *cls)
1314 {
1315  struct GSF_PendingRequest *pr = cls;
1316 
1318  _ ("Datastore lookup already took %s!\n"),
1321  GNUNET_YES));
1323  &warn_delay_task,
1324  pr);
1325 }
1326 
1327 
1333 static void
1335 {
1336  struct GSF_PendingRequest *pr = cls;
1337 
1339  _ ("On-demand lookup already took %s!\n"),
1342  GNUNET_YES));
1345  pr);
1346 }
1347 
1348 
1349 /* Call our continuation (if we have any) */
1350 static void
1352 {
1354 
1355  GNUNET_assert (NULL == pr->qe);
1356  if (NULL != pr->warn_task)
1357  {
1359  pr->warn_task = NULL;
1360  }
1361  if (NULL == cont)
1362  return; /* no continuation */
1363  pr->llc_cont = NULL;
1364  if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options))
1365  {
1367  {
1368  /* Signal that we are done and that there won't be any
1369  additional results to allow client to clean up state. */
1370  pr->rh (pr->rh_cls,
1372  pr,
1373  UINT32_MAX,
1377  NULL,
1378  0);
1379  }
1380  /* Finally, call our continuation to signal that we are
1381  done with local processing of this request; i.e. to
1382  start reading again from the client. */
1384  return;
1385  }
1386 
1387  cont (pr->llc_cont_cls, pr, pr->local_result);
1388 }
1389 
1390 
1391 /* Update stats and call continuation */
1392 static void
1394 {
1396  "No further local responses available.\n");
1397 #if INSANE_STATISTICS
1401  gettext_noop (
1402  "# requested DBLOCK or IBLOCK not found"),
1403  1,
1404  GNUNET_NO);
1405 #endif
1406  call_continuation (pr);
1407 }
1408 
1409 
1410 /* forward declaration */
1411 static void
1412 process_local_reply (void *cls,
1413  const struct GNUNET_HashCode *key,
1414  size_t size,
1415  const void *data,
1416  enum GNUNET_BLOCK_Type type,
1417  uint32_t priority,
1418  uint32_t anonymity,
1419  uint32_t replication,
1421  uint64_t uid);
1422 
1423 
1424 /* Start a local query */
1425 static void
1427  uint64_t next_uid,
1428  bool random)
1429 {
1432  &warn_delay_task,
1433  pr);
1435  next_uid,
1436  random,
1437  &pr->public_data.query,
1438  pr->public_data.type ==
1441  : pr->public_data.type,
1443  & pr->public_data.options))
1444  ? UINT_MAX
1445  : 1
1446  /* queue priority */,
1448  & pr->public_data.options))
1449  ? UINT_MAX
1451  /* max queue size */,
1453  pr);
1454  if (NULL != pr->qe)
1455  return;
1456  GNUNET_log (
1458  "ERROR Requesting `%s' of type %d with next_uid %llu from datastore.\n",
1459  GNUNET_h2s (&pr->public_data.query),
1460  pr->public_data.type,
1461  (unsigned long long) next_uid);
1463  gettext_noop (
1464  "# Datastore lookups concluded (error queueing)"),
1465  1,
1466  GNUNET_NO);
1467  call_continuation (pr);
1468 }
1469 
1470 
1489 static void
1491  const struct GNUNET_HashCode *key,
1492  size_t size,
1493  const void *data,
1494  enum GNUNET_BLOCK_Type type,
1495  uint32_t priority,
1496  uint32_t anonymity,
1497  uint32_t replication,
1498  struct GNUNET_TIME_Absolute expiration,
1499  uint64_t uid)
1500 {
1501  struct GSF_PendingRequest *pr = cls;
1502  struct ProcessReplyClosure prq;
1503  struct GNUNET_HashCode query;
1504  unsigned int old_rf;
1505 
1507  pr->warn_task = NULL;
1508  if (NULL == pr->qe)
1509  goto called_from_on_demand;
1510  pr->qe = NULL;
1511  if (
1512  (NULL == key) && pr->seen_null &&
1513  ! pr->have_first_uid) /* We have hit the end for the 2nd time with no results */
1514  {
1515  /* No results */
1516 #if INSANE_STATISTICS
1518  gettext_noop (
1519  "# Datastore lookups concluded (no results)"),
1520  1,
1521  GNUNET_NO);
1522 #endif
1523  no_more_local_results (pr);
1524  return;
1525  }
1526  if (((NULL == key) &&
1527  pr->seen_null) || /* We have hit the end for the 2nd time OR */
1528  (pr->seen_null && pr->have_first_uid &&
1529  (uid >= pr->first_uid))) /* We have hit the end and past first UID */
1530  {
1531  /* Seen all results */
1533  gettext_noop (
1534  "# Datastore lookups concluded (seen all)"),
1535  1,
1536  GNUNET_NO);
1537  no_more_local_results (pr);
1538  return;
1539  }
1540  if (NULL == key)
1541  {
1542  GNUNET_assert (! pr->seen_null);
1543  pr->seen_null = true;
1544  start_local_query (pr, 0 /* next_uid */, false /* random */);
1545  return;
1546  }
1547  if (! pr->have_first_uid)
1548  {
1549  pr->first_uid = uid;
1550  pr->have_first_uid = true;
1551  }
1552  pr->result_count++;
1553  if (pr->result_count > MAX_RESULTS)
1554  {
1556  GSF_stats,
1557  gettext_noop ("# Datastore lookups aborted (more than MAX_RESULTS)"),
1558  1,
1559  GNUNET_NO);
1560  no_more_local_results (pr);
1561  return;
1562  }
1564  "Received reply for `%s' of type %d with UID %llu from datastore.\n",
1565  GNUNET_h2s (key),
1566  type,
1567  (unsigned long long) uid);
1568  if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1569  {
1571  "Found ONDEMAND block, performing on-demand encoding\n");
1573  gettext_noop (
1574  "# on-demand blocks matched requests"),
1575  1,
1576  GNUNET_NO);
1580  pr);
1582  size,
1583  data,
1584  type,
1585  priority,
1586  anonymity,
1587  replication,
1588  expiration,
1589  uid,
1591  pr))
1592  {
1594  gettext_noop (
1595  "# on-demand lookups performed successfully"),
1596  1,
1597  GNUNET_NO);
1598  return; /* we're done */
1599  }
1601  gettext_noop ("# on-demand lookups failed"),
1602  1,
1603  GNUNET_NO);
1605  start_local_query (pr, uid + 1 /* next_uid */, false /* random */);
1606  return;
1607  }
1608 called_from_on_demand:
1609  old_rf = pr->public_data.results_found;
1610  memset (&prq, 0, sizeof(prq));
1611  prq.data = data;
1612  prq.expiration = expiration;
1613  prq.size = size;
1614  if (GNUNET_OK !=
1615  GNUNET_BLOCK_get_key (GSF_block_ctx, type, data, size, &query))
1616  {
1617  GNUNET_break (0);
1619  key,
1620  size,
1621  data,
1622  UINT_MAX,
1623  UINT_MAX,
1624  NULL,
1625  NULL);
1626  start_local_query (pr, uid + 1 /* next_uid */, false /* random */);
1627  return;
1628  }
1629  prq.type = type;
1630  prq.priority = priority;
1631  prq.request_found = GNUNET_NO;
1632  prq.anonymity_level = anonymity;
1633  if ((0 == old_rf) && (0 == pr->public_data.results_found))
1636  process_reply (&prq, key, pr);
1637  pr->local_result = prq.eval;
1639  {
1641  GSF_stats,
1642  gettext_noop ("# Datastore lookups concluded (found last result)"),
1643  1,
1644  GNUNET_NO);
1645  call_continuation (pr);
1646  return;
1647  }
1648  if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
1650  (pr->public_data.results_found > 5 + 2 * pr->public_data.priority)))
1651  {
1652  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Load too high, done with request\n");
1654  gettext_noop (
1655  "# Datastore lookups concluded (load too high)"),
1656  1,
1657  GNUNET_NO);
1658  call_continuation (pr);
1659  return;
1660  }
1661  start_local_query (pr, uid + 1 /* next_uid */, false /* random */);
1662 }
1663 
1664 
1672 int
1674  const struct GNUNET_PeerIdentity *target)
1675 {
1676  struct GNUNET_PeerIdentity pi;
1677 
1678  if (0 == pr->origin_pid)
1679  return GNUNET_YES;
1680  GNUNET_PEER_resolve (pr->origin_pid, &pi);
1681  return (0 == memcmp (&pi, target, sizeof(struct GNUNET_PeerIdentity)))
1682  ? GNUNET_NO
1683  : GNUNET_YES;
1684 }
1685 
1686 
1694 void
1697  void *cont_cls)
1698 {
1699  GNUNET_assert (NULL == pr->gh);
1700  GNUNET_assert (NULL == pr->cadet_request);
1701  GNUNET_assert (NULL == pr->llc_cont);
1702  pr->llc_cont = cont;
1703  pr->llc_cont_cls = cont_cls;
1704 #if INSANE_STATISTICS
1706  gettext_noop ("# Datastore lookups initiated"),
1707  1,
1708  GNUNET_NO);
1709 #endif
1710  start_local_query (pr, 0 /* next_uid */, true /* random */);
1711 }
1712 
1713 
1723 void
1724 handle_p2p_put (void *cls, const struct PutMessage *put)
1725 {
1726  struct GSF_ConnectedPeer *cp = cls;
1727  uint16_t msize;
1728  size_t dsize;
1729  enum GNUNET_BLOCK_Type type;
1730  struct GNUNET_TIME_Absolute expiration;
1731  struct GNUNET_HashCode query;
1732  struct ProcessReplyClosure prq;
1733  struct GNUNET_TIME_Relative block_time;
1734  double putl;
1735  struct PutMigrationContext *pmc;
1736 
1738  "Received P2P PUT from %s\n",
1741  msize = ntohs (put->header.size);
1742  dsize = msize - sizeof(struct PutMessage);
1743  type = ntohl (put->type);
1744  expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
1745  /* do not allow migrated content to live longer than 1 year */
1748  expiration);
1749  if (GNUNET_OK !=
1750  GNUNET_BLOCK_get_key (GSF_block_ctx, type, &put[1], dsize, &query))
1751  {
1752  GNUNET_break_op (0);
1753  return;
1754  }
1756  gettext_noop ("# GAP PUT messages received"),
1757  1,
1758  GNUNET_NO);
1759  /* now, lookup 'query' */
1760  prq.data = (const void *) &put[1];
1761  prq.sender = cp;
1762  prq.size = dsize;
1763  prq.type = type;
1764  prq.expiration = expiration;
1765  prq.priority = 0;
1766  prq.anonymity_level = UINT32_MAX;
1767  prq.request_found = GNUNET_NO;
1768  prq.eo = GNUNET_BLOCK_EO_NONE;
1770  &query,
1771  &process_reply,
1772  &prq);
1773  if (NULL != cp)
1774  {
1777  + 1000 * prq.priority);
1779  }
1780  if ((GNUNET_YES == active_to_migration) && (NULL != cp) &&
1782  {
1784  "Replicating result for query `%s' with priority %u\n",
1785  GNUNET_h2s (&query),
1786  prq.priority);
1787  pmc = GNUNET_new (struct PutMigrationContext);
1788  pmc->start = GNUNET_TIME_absolute_get ();
1789  pmc->requested = prq.request_found;
1792  &pmc->origin);
1793  if (NULL == GNUNET_DATASTORE_put (GSF_dsh,
1794  0,
1795  &query,
1796  dsize,
1797  &put[1],
1798  type,
1799  prq.priority,
1800  1 /* anonymity */,
1801  0 /* replication */,
1802  expiration,
1803  1 + prq.priority,
1806  pmc))
1807  {
1809  GNUNET_SYSERR,
1811  NULL);
1812  }
1813  }
1814  else if (NULL != cp)
1815  {
1817  "Choosing not to keep content `%s' (%d/%d)\n",
1818  GNUNET_h2s (&query),
1821  }
1822  putl = GNUNET_LOAD_get_load (datastore_put_load);
1823  if ((NULL != cp) && (GNUNET_NO == prq.request_found) &&
1825  (putl > 2.5 * (1 + prq.priority))))
1826  {
1829  block_time = GNUNET_TIME_relative_multiply (
1832  (unsigned int) (60000 * putl * putl)));
1833  GNUNET_log (
1835  "Asking to stop migration for %s because of load %f and events %d/%d\n",
1837  putl,
1839  (GNUNET_NO == prq.request_found));
1841  GNUNET_TIME_relative_to_absolute (block_time));
1842  }
1843 }
1844 
1845 
1852 int
1854 {
1855  return (NULL != pr->rh) ? GNUNET_YES : GNUNET_NO;
1856 }
1857 
1858 
1862 void
1864 {
1865  if (GNUNET_OK !=
1867  "fs",
1868  "MAX_PENDING_REQUESTS",
1870  {
1872  "fs",
1873  "MAX_PENDING_REQUESTS");
1874  }
1876  GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING");
1877  datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
1878  pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024, GNUNET_YES);
1879  requests_by_expiration_heap =
1881 }
1882 
1883 
1887 void
1889 {
1892  pr_map = NULL;
1893  GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1894  requests_by_expiration_heap = NULL;
1895  GNUNET_LOAD_value_free (datastore_put_load);
1896  datastore_put_load = NULL;
1897 }
1898 
1899 
1900 /* end of gnunet-service-fs_pr.c */
void GSF_connected_peer_change_preference_(struct GSF_ConnectedPeer *cp, uint64_t pref)
Notify core about a preference we have for the given peer (to allocate more resources towards it)...
Block does not match query (invalid result)
int(* GNUNET_CONTAINER_MulitHashMapIteratorCallback)(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator over hash map entries.
static struct GNUNET_TIME_Absolute min_expiration
Minimum time that content should have to not be discarded instantly (time stamp of any content that w...
API to handle &#39;connected peers&#39;.
struct GSF_ConnectedPeer * sender
Who gave us this reply? NULL for local host (or DHT)
const struct GNUNET_PeerIdentity * target
Identity of a peer hosting the content, otherwise NULl.
static void warn_delay_task(void *cls)
Task that issues a warning if the datastore lookup takes too long.
struct GSF_CadetRequest * GSF_cadet_query(const struct GNUNET_PeerIdentity *target, const struct GNUNET_HashCode *query, enum GNUNET_BLOCK_Type type, GSF_CadetReplyProcessor proc, void *proc_cls)
Look for a block by directly contacting a particular peer.
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_subtract(struct GNUNET_TIME_Absolute start, struct GNUNET_TIME_Relative duration)
Subtract a given relative duration from the given start time.
Definition: time.c:422
const void * data
The data for the reply.
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
#define GNUNET_TIME_UNIT_HOURS
One hour.
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_ntoh(struct GNUNET_TIME_AbsoluteNBO a)
Convert absolute time from network byte order.
Definition: time.c:673
uint32_t priority
How much was this reply worth to us?
#define GNUNET_TIME_UNIT_ZERO_ABS
Absolute time zero.
int GNUNET_CONFIGURATION_get_value_number(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, unsigned long long *number)
Get a configuration value that should be a number.
uint64_t rel_value_us
The actual value.
struct GNUNET_LOAD_Value * GNUNET_LOAD_value_init(struct GNUNET_TIME_Relative autodecline)
Create a new load value.
Definition: load.c:124
Request is allowed to refresh bloomfilter and change mingle value.
int GSF_request_plan_reference_get_last_transmission_(struct GSF_PendingRequestPlanBijection *pr_head, struct GSF_ConnectedPeer *sender, struct GNUNET_TIME_Absolute *result)
Get the last transmission attempt time for the request plan list referenced by pr_head, that was sent to sender.
void GSF_update_datastore_delay_(struct GNUNET_TIME_Absolute start)
We&#39;ve just now completed a datastore request.
Any type of block, used as a wildcard when searching.
Response from FS service with a result for a previous FS search.
Definition: fs.h:328
struct GNUNET_CONTAINER_HeapNode * GNUNET_CONTAINER_heap_insert(struct GNUNET_CONTAINER_Heap *heap, void *element, GNUNET_CONTAINER_HeapCostType cost)
Inserts a new element into the heap.
struct GNUNET_GETOPT_CommandLineOption options[]
Definition: 002.c:5
struct GNUNET_TIME_AbsoluteNBO expiration
When does this result expire?
Definition: fs.h:343
uint32_t num_transmissions
Counter for how often this request has been transmitted (estimate, because we might have the same req...
#define GET_MESSAGE_BIT_TRANSMIT_TO
The peer identity of a peer that had claimed to have the content previously is included (can be used ...
int32_t ttl
Relative time to live in MILLISECONDS (network byte order)
GNUNET_BLOCK_Type
Blocks in the datastore and the datacache must have a unique type.
struct GNUNET_TIME_Absolute expiration
When the reply expires.
uint32_t priority
How important is this request (network byte order)
bool seen_null
Have we seen a NULL result yet?
struct GNUNET_TIME_Absolute ttl
Current TTL for the request.
uint64_t GNUNET_CRYPTO_random_u64(enum GNUNET_CRYPTO_Quality mode, uint64_t max)
Random on unsigned 64-bit values.
#define GNUNET_TIME_UNIT_MINUTES
One minute.
struct GNUNET_TIME_Relative GNUNET_TIME_relative_max(struct GNUNET_TIME_Relative t1, struct GNUNET_TIME_Relative t2)
Return the maximum of two relative time values.
Definition: time.c:287
static void no_more_local_results(struct GSF_PendingRequest *pr)
enum GNUNET_BLOCK_EvaluationOptions eo
Control flags for evaluation.
static int start
Set if we are to start default services (including ARM).
Definition: gnunet-arm.c:39
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_put(struct GNUNET_DATASTORE_Handle *h, uint32_t rid, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Store an item in the datastore.
Closure for process_reply() function.
struct GNUNET_TIME_Absolute GNUNET_TIME_relative_to_absolute(struct GNUNET_TIME_Relative rel)
Convert relative time to an absolute time in the future.
Definition: time.c:246
static size_t data_size
Number of bytes in data.
Definition: gnunet-abd.c:187
uint32_t GNUNET_CRYPTO_random_u32(enum GNUNET_CRYPTO_Quality mode, uint32_t i)
Produce a random value.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_TIME_UNIT_SECONDS
One second.
unsigned int results_found
Number of results we have found for this request so far.
static void put_migration_continuation(void *cls, int success, struct GNUNET_TIME_Absolute min_expiration, const char *msg)
Continuation called to notify client about result of the operation.
struct GNUNET_SCHEDULER_Task * warn_task
Task that warns us if the local datastore lookup takes too long.
uint32_t anonymity_level
Desired anonymity level.
double GNUNET_LOAD_get_load(struct GNUNET_LOAD_Value *load)
Get the current load.
Definition: load.c:200
static unsigned int replication
static struct GNUNET_CONTAINER_Heap * requests_by_expiration_heap
Heap with the request that will expire next at the top.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
static struct GNUNET_CONTAINER_MultiHashMap * pr_map
All pending requests, ordered by the query.
struct GSF_PendingRequestPlanBijection * pr_head
Fields for the plan module to track a DLL with the request.
struct GSF_PendingRequestData public_data
Public data for the request.
Block does not match xquery (valid result, not relevant for the request)
struct GNUNET_DHT_Handle * GSF_dht
Handle for DHT operations.
bool have_first_uid
Do we have a first UID yet?
Request priority is allowed to be exceeded.
struct GNUNET_STATISTICS_Handle * GSF_stats
Handle for reporting statistics.
#define GNUNET_NO
Definition: gnunet_common.h:78
double GNUNET_LOAD_get_average(struct GNUNET_LOAD_Value *load)
Get the average value given to update so far.
Definition: load.c:215
shared data structures of gnunet-service-fs.c
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:75
#define GNUNET_free_non_null(ptr)
Free the memory pointed to by ptr if ptr is not NULL.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
int GSF_pending_request_is_compatible_(struct GSF_PendingRequest *pra, struct GSF_PendingRequest *prb)
Test if two pending requests are compatible (would generate the same query modulo filters and should ...
Default behavior.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
struct GNUNET_TIME_Absolute start_time
When did we start with the request.
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
static void refresh_bloomfilter(enum GNUNET_BLOCK_Type type, struct GSF_PendingRequest *pr)
Recalculate our bloom filter for filtering replies.
static int test_put_load_too_high(uint32_t priority)
Test if the DATABASE (PUT) load on this peer is too high to even consider processing the query at all...
static int process_reply(void *cls, const struct GNUNET_HashCode *key, void *value)
We have received a reply; handle it!
static void update_request_performance_data(struct ProcessReplyClosure *prq, struct GSF_PendingRequest *pr)
Update the performance data for the sender (if any) since the sender successfully answered one of our...
void GNUNET_PEER_resolve(GNUNET_PEER_Id id, struct GNUNET_PeerIdentity *pid)
Convert an interned PID to a normal peer identity.
Definition: peer.c:225
uint64_t abs_value_us
The actual value.
const struct GNUNET_CONFIGURATION_Handle * GSF_cfg
Our configuration.
void GSF_pending_request_update_(struct GSF_PendingRequest *pr, const struct GNUNET_HashCode *replies_seen, unsigned int replies_seen_count)
Update a given pending request with additional replies that have been seen.
Internal representation of the hash map.
static unsigned int anonymity_level
Anonymity level option to use for publishing.
void GSF_pending_request_init_()
Setup the subsystem.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur...
GSF_LocalLookupContinuation llc_cont
Function to call upon completion of the local get request, or NULL for none.
size_t size
Size of data.
uint32_t filter_mutator
The content hash should be mutated using this value before checking against the bloomfilter (used to ...
Type of a block representing a block to be encoded on demand from disk.
void GSF_peer_update_performance_(struct GSF_ConnectedPeer *cp, struct GNUNET_TIME_Absolute request_time, uint32_t request_priority)
Report on receiving a reply; update the performance record of the given peer.
int GNUNET_BLOCK_get_key(struct GNUNET_BLOCK_Context *ctx, enum GNUNET_BLOCK_Type type, const void *block, size_t block_size, struct GNUNET_HashCode *key)
Function called to obtain the key for a block.
Definition: block.c:378
void GNUNET_LOAD_update(struct GNUNET_LOAD_Value *load, uint64_t data)
Update the current load.
Definition: load.c:236
size_t result_count
Result count.
#define GNUNET_TIME_UNIT_FOREVER_ABS
Constant used to specify "forever".
#define _(String)
GNU gettext support macro.
Definition: platform.h:181
struct GNUNET_HashCode query
Primary query hash for this request.
void(* GSF_LocalLookupContinuation)(void *cls, struct GSF_PendingRequest *pr, enum GNUNET_BLOCK_EvaluationResult result)
Function to be called after we&#39;re done processing replies from the local lookup.
struct GNUNET_TIME_Absolute start
Start time for the operation.
unsigned int replies_seen_count
Number of valid entries in the &#39;replies_seen&#39; array.
Request persists indefinitely (no expiration).
API to handle pending requests.
Public data (in the sense of not encapsulated within &#39;gnunet-service-fs_pr&#39;, not in the sense of netw...
GNUNET_PEER_Id origin_pid
Identity of the peer that we should never forward this query to since it originated this query (0 for...
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct...
Definition: gnunet_mq_lib.h:52
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1253
API to manage query plan.
Entry in our priority queue.
Definition: datastore_api.c:99
#define GNUNET_array_grow(arr, size, tsize)
Grow a well-typed (!) array.
unsigned int GNUNET_PEER_Id
A GNUNET_PEER_Id is simply a shorter version of a "struct GNUNET_PeerIdentifier" that can be used ins...
static unsigned long long max_pending_requests
Maximum number of requests (from other peers, overall) that we&#39;re willing to have pending at any give...
enum GNUNET_BLOCK_EvaluationResult local_result
Last result from the local datastore lookup evaluation.
void GNUNET_log_config_missing(enum GNUNET_ErrorType kind, const char *section, const char *option)
Log error message about missing configuration option.
static void start_local_query(struct GSF_PendingRequest *pr, uint64_t next_uid, bool random)
static char * value
Value of the record to add/remove.
Valid result, but suppressed because it is a duplicate.
int GSF_pending_request_test_active_(struct GSF_PendingRequest *pr)
Check if the given request is still active.
uint32_t respect
Respect rating for this peer.
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
Message to the datastore service asking about specific content.
Definition: datastore.h:140
void GNUNET_CRYPTO_hash(const void *block, size_t size, struct GNUNET_HashCode *ret)
Compute hash of a given block.
Definition: crypto_hash.c:48
uint32_t hash_bitmap
Which of the optional hash codes are present at the end of the message? See GET_MESSAGE_BIT_xx consta...
void * llc_cont_cls
Closure for llc_cont.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
struct GSF_PeerPerformanceData * GSF_get_peer_performance_data_(struct GSF_ConnectedPeer *cp)
Return the performance data record for the given peer.
unsigned int GSF_datastore_queue_size
Size of the datastore queue we assume for common requests.
enum GNUNET_BLOCK_EvaluationResult GNUNET_BLOCK_evaluate(struct GNUNET_BLOCK_Context *ctx, enum GNUNET_BLOCK_Type type, struct GNUNET_BLOCK_Group *group, enum GNUNET_BLOCK_EvaluationOptions eo, const struct GNUNET_HashCode *query, const void *xquery, size_t xquery_size, const void *reply_block, size_t reply_block_size)
Function called to validate a reply or a request.
Definition: block.c:338
struct GNUNET_BLOCK_Context * GSF_block_ctx
Our block context.
struct GNUNET_HashCode query
Hashcodes of the file(s) we&#39;re looking for.
non-anonymous file-transfer
Inner block in the CHK tree.
void * GNUNET_CONTAINER_heap_peek(const struct GNUNET_CONTAINER_Heap *heap)
Get element stored at the root of heap.
struct GSF_ConnectedPeer * GSF_peer_get_(const struct GNUNET_PeerIdentity *peer)
Get a handle for a connected peer.
const char * GNUNET_STRINGS_relative_time_to_string(struct GNUNET_TIME_Relative delta, int do_round)
Give relative time in human-readable fancy format.
Definition: strings.c:687
#define GET_MESSAGE_BIT_RETURN_TO
The peer identity of a peer waiting for the reply is included (used if the response should be transmi...
Last possible valid result.
static char buf[2048]
#define GNUNET_new_array(n, type)
Allocate a size n array with structs or unions of the given type.
int GNUNET_CONTAINER_multihashmap_remove(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, const void *value)
Remove the given key-value pair from the map.
int(* GSF_PendingRequestIterator)(void *cls, const struct GNUNET_HashCode *key, struct GSF_PendingRequest *pr)
Signature of function called on each request.
void GSF_local_lookup_(struct GSF_PendingRequest *pr, GSF_LocalLookupContinuation cont, void *cont_cls)
Look up the request in the local datastore.
struct GNUNET_TIME_Absolute qe_start
Time we started the last datastore lookup.
void * rh_cls
Closure for rh.
struct GNUNET_TIME_Relative GNUNET_TIME_relative_multiply(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Multiply relative time by a given factor.
Definition: time.c:442
Handle to a node in a heap.
uint32_t respect_offered
How much respect did we (in total) offer for this request so far (estimate, because we might have the...
#define CONTENT_BANDWIDTH_VALUE
Bandwidth value of a 0-priority content (must be fairly high compared to query since content is typic...
enum GNUNET_BLOCK_Type type
Type of the requested block.
int GNUNET_BLOCK_group_serialize(struct GNUNET_BLOCK_Group *bg, uint32_t *nonce, void **raw_data, size_t *raw_data_size)
Serialize state of a block group.
Definition: block.c:180
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_remove(struct GNUNET_DATASTORE_Handle *h, const struct GNUNET_HashCode *key, size_t size, const void *data, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Explicitly remove some content from the database.
Request must only be processed locally.
Heap with the minimum cost at the root.
struct GNUNET_MessageHeader header
Message type will be GNUNET_MESSAGE_TYPE_FS_PUT.
Definition: fs.h:333
void GNUNET_CONTAINER_heap_destroy(struct GNUNET_CONTAINER_Heap *heap)
Destroys the heap.
Type of a block representing any type of search result (universal).
A 512-bit hashcode.
GSF_PendingRequestReplyHandler rh
Function to call if we encounter a reply.
static char * expiration
Credential TTL.
Definition: gnunet-abd.c:96
#define GNUNET_TIME_UNIT_MILLISECONDS
One millisecond.
void GNUNET_DHT_get_stop(struct GNUNET_DHT_GetHandle *get_handle)
Stop async DHT-get.
Definition: dht_api.c:1155
uint32_t type
Desired content type.
Definition: datastore.h:150
static void odc_warn_delay_task(void *cls)
Task that issues a warning if the datastore lookup takes too long.
struct GNUNET_DATASTORE_Handle * GSF_dsh
Our connection to the datastore.
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_get(void)
Get the current time.
Definition: time.c:118
struct GNUNET_DHT_GetHandle * gh
DHT request handle for this request (or NULL for none).
The block is obtained from the local database, skip cryptographic checks.
void GSF_plan_notify_request_done_(struct GSF_PendingRequest *pr)
Notify the plan about a request being done; destroy all entries associated with this request...
Node in the heap.
unsigned int GNUNET_CONTAINER_heap_get_size(const struct GNUNET_CONTAINER_Heap *heap)
Get the current size of the heap.
Valid result, and there may be more.
#define CADET_RETRY_MAX
If obtaining a block via cadet fails, how often do we retry it before giving up for good (and stickin...
void GSF_cadet_lookup_(struct GSF_PendingRequest *pr)
Consider downloading via cadet (if possible)
int requested
GNUNET_YES if we had a matching request for this block, GNUNET_NO if not.
unsigned int cadet_retry_count
How often have we retried this request via &#39;cadet&#39;? (used to bound overall retries).
void GSF_pending_request_done_()
Shutdown the subsystem.
Handle for a request that is going out via cadet API.
struct GNUNET_TIME_Relative GNUNET_TIME_relative_min(struct GNUNET_TIME_Relative t1, struct GNUNET_TIME_Relative t2)
Return the minimum of two relative time values.
Definition: time.c:272
struct GNUNET_TESTBED_Peer * peer
The peer associated with this model.
struct GNUNET_HashCode key
The key used in the DHT.
#define GNUNET_SYSERR
Definition: gnunet_common.h:76
static unsigned int size
Size of the "table".
Definition: peer.c:67
#define MAX_RESULTS
Hard limit on the number of results we may get from the datastore per query.
void GNUNET_DHT_get_filter_known_results(struct GNUNET_DHT_GetHandle *get_handle, unsigned int num_results, const struct GNUNET_HashCode *results)
Tell the DHT not to return any of the following known results to this client.
Definition: dht_api.c:1126
int GSF_test_get_load_too_high_(uint32_t priority)
Test if the DATABASE (GET) load on this peer is too high to even consider processing the query at all...
uint32_t original_priority
Priority that this request (originally) had for us.
Values we track for load calculations.
Definition: load.c:35
#define DHT_GET_REPLICATION
Desired replication level for GETs.
GNUNET_BLOCK_EvaluationResult
Possible ways for how a block may relate to a query.
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
void GSF_iterate_pending_requests_(GSF_PendingRequestIterator it, void *cls)
Iterate over all pending requests.
indexing for the file-sharing service
int GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
struct GNUNET_BLOCK_Group * GNUNET_BLOCK_group_create(struct GNUNET_BLOCK_Context *ctx, enum GNUNET_BLOCK_Type type, uint32_t nonce, const void *raw_data, size_t raw_data_size,...)
Create a new block group.
Definition: block.c:289
uint32_t type
Type of the block (in big endian).
Definition: fs.h:338
Handle to a GET request.
Definition: dht_api.c:79
struct GNUNET_CONTAINER_Heap * GNUNET_CONTAINER_heap_create(enum GNUNET_CONTAINER_HeapOrder order)
Create a new heap.
struct GNUNET_MQ_Envelope * GSF_pending_request_get_message_(struct GSF_PendingRequest *pr)
Generate the message corresponding to the given pending request for transmission to other peers...
static int clean_request(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator to free pending requests.
Specified block type not supported by this plugin.
void handle_p2p_put(void *cls, const struct PutMessage *put)
Handle P2P "CONTENT" message.
Allow multiple values with the same key.
unsigned int replies_seen_size
Length of the &#39;replies_seen&#39; array.
int GSF_pending_request_test_target_(struct GSF_PendingRequest *pr, const struct GNUNET_PeerIdentity *target)
Is the given target a legitimate peer for forwarding the given request?
enum GNUNET_BLOCK_Type type
Type of the block.
void GNUNET_DATASTORE_cancel(struct GNUNET_DATASTORE_QueueEntry *qe)
Cancel a datastore operation.
static void cadet_reply_proc(void *cls, enum GNUNET_BLOCK_Type type, struct GNUNET_TIME_Absolute expiration, size_t data_size, const void *data)
Function called with a reply from the cadet.
uint32_t anonymity_level
Anonymity requirements for this reply.
struct GNUNET_DATASTORE_QueueEntry * qe
Datastore queue entry for this request (or NULL for none).
static struct GNUNET_LOAD_Value * datastore_put_load
Datastore &#39;PUT&#39; load tracking.
The identity of the host (wraps the signing key of the peer).
struct GNUNET_LOAD_Value * GSF_rt_entry_lifetime
How long do requests typically stay in the routing table?
struct GSF_PendingRequestData * GSF_pending_request_get_data_(struct GSF_PendingRequest *pr)
Obtain the public data associated with a pending request.
enum GSF_PendingRequestOptions options
Options for the request.
uint64_t first_uid
Unique ID of the first result from the local datastore; used to terminate the loop.
int GNUNET_CONTAINER_multihashmap_get_multiple(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, GNUNET_CONTAINER_MulitHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map that match a particular key.
#define GNUNET_ALIGN
gcc-ism to force alignment; we use this to align char-arrays that may then be cast to &#39;struct&#39;s...
void GSF_cadet_query_cancel(struct GSF_CadetRequest *sr)
Cancel an active request; must not be called after &#39;proc&#39; was calld.
Request must only be forwarded (no routing)
Performance data kept for a peer.
#define MAX_DATASTORE_QUEUE
Maximum size of the datastore queue for P2P operations.
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_min(struct GNUNET_TIME_Absolute t1, struct GNUNET_TIME_Absolute t2)
Return the minimum of two absolute time values.
Definition: time.c:302
struct GNUNET_DHT_GetHandle * GNUNET_DHT_get_start(struct GNUNET_DHT_Handle *handle, enum GNUNET_BLOCK_Type type, const struct GNUNET_HashCode *key, uint32_t desired_replication_level, enum GNUNET_DHT_RouteOption options, const void *xquery, size_t xquery_size, GNUNET_DHT_GetIterator iter, void *iter_cls)
Perform an asynchronous GET operation on the DHT identified.
Definition: dht_api.c:1067
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_duration(struct GNUNET_TIME_Absolute whence)
Get the duration of an operation as the difference of the current time and the given start time "henc...
Definition: time.c:375
struct GNUNET_BLOCK_Group * bg
Block group for filtering replies we&#39;ve already seen.
#define GNUNET_LOAD_value_free(lv)
Free a load value.
struct GNUNET_PeerIdentity origin
Request origin.
enum GNUNET_BLOCK_EvaluationResult eval
Evaluation result (returned).
Query format does not match block type (invalid query).
void GSF_dht_lookup_(struct GSF_PendingRequest *pr)
Consider looking up the data in the DHT (anonymity-level permitting).
void GNUNET_PEER_change_rc(GNUNET_PEER_Id id, int delta)
Change the reference counter of an interned PID.
Definition: peer.c:197
#define GNUNET_TIME_UNIT_YEARS
One year (365 days).
#define GNUNET_log(kind,...)
Entry in list of pending tasks.
Definition: scheduler.c:134
struct GNUNET_CONTAINER_HeapNode * hnode
Entry for this pending request in the expiration heap, or NULL.
void(* GSF_PendingRequestReplyHandler)(void *cls, enum GNUNET_BLOCK_EvaluationResult eval, struct GSF_PendingRequest *pr, uint32_t reply_anonymity_level, struct GNUNET_TIME_Absolute expiration, struct GNUNET_TIME_Absolute last_transmission, enum GNUNET_BLOCK_Type type, const void *data, size_t data_len)
Handle a reply to a pending request.
static void process_local_reply(void *cls, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid)
We&#39;re processing (local) results for a search request from another peer.
Context for put_migration_continuation().
A connected peer.
Data block (leaf) in the CHK tree.
Block group data.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
static void handle_dht_reply(void *cls, struct GNUNET_TIME_Absolute exp, const struct GNUNET_HashCode *key, const struct GNUNET_PeerIdentity *get_path, unsigned int get_path_length, const struct GNUNET_PeerIdentity *put_path, unsigned int put_path_length, enum GNUNET_BLOCK_Type type, size_t size, const void *data)
Iterator called on each result obtained for a DHT operation that expects a reply. ...
struct GNUNET_TIME_Relative migration_delay
If we get content we already have from this peer, for how long do we block it? Adjusted based on the ...
unsigned int GSF_cover_content_count
How many content messages have we received &#39;recently&#39; that have not yet been claimed as cover traffic...
struct GSF_PendingRequest * GSF_pending_request_create_(enum GSF_PendingRequestOptions options, enum GNUNET_BLOCK_Type type, const struct GNUNET_HashCode *query, const struct GNUNET_PeerIdentity *target, const char *bf_data, size_t bf_size, uint32_t mingle, uint32_t anonymity_level, uint32_t priority, int32_t ttl, GNUNET_PEER_Id sender_pid, GNUNET_PEER_Id origin_pid, const struct GNUNET_HashCode *replies_seen, unsigned int replies_seen_count, GSF_PendingRequestReplyHandler rh, void *rh_cls)
Create a new pending request.
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_remaining(struct GNUNET_TIME_Absolute future)
Given a timestamp in the future, how much time remains until then?
Definition: time.c:331
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
Time for absolute times used by GNUnet, in microseconds.
#define GNUNET_YES
Definition: gnunet_common.h:77
static unsigned int anonymity
uint32_t priority
Priority that this request (still) has for us.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_key(struct GNUNET_DATASTORE_Handle *h, uint64_t next_uid, bool random, const struct GNUNET_HashCode *key, enum GNUNET_BLOCK_Type type, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a result for a particular key from the datastore.
void GSF_block_peer_migration_(struct GSF_ConnectedPeer *cp, struct GNUNET_TIME_Absolute block_time)
Ask a peer to stop migrating data to us until the given point in time.
An active request.
int GNUNET_FS_handle_on_demand_block(const struct GNUNET_HashCode *key, uint32_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid, GNUNET_DATASTORE_DatumProcessor cont, void *cont_cls)
We&#39;ve received an on-demand encoded block from the datastore.
static struct GNUNET_PeerIdentity pid
Identity of the peer we transmit to / connect to.
struct GSF_CadetRequest * cadet_request
Cadet request handle for this request (or NULL for none).
void GSF_pending_request_cancel_(struct GSF_PendingRequest *pr, int full_cleanup)
Explicitly cancel a pending request.
int GNUNET_CONFIGURATION_get_value_yesno(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option)
Get a configuration value that should be in a set of "YES" or "NO".
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MulitHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
int request_found
Did we find a matching request?
uint32_t data
The data value.
void * GNUNET_CONTAINER_heap_remove_node(struct GNUNET_CONTAINER_HeapNode *node)
Removes a node from the heap.
GNUNET_BLOCK_EvaluationOptions
Flags that can be set to control the evaluation.
Query is valid, no reply given.
#define GNUNET_MESSAGE_TYPE_FS_GET
P2P request for content (one FS to another).
struct GNUNET_TIME_Relative GNUNET_TIME_relative_saturating_multiply(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Saturating multiply relative time by a given factor.
Definition: time.c:501
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
GSF_PendingRequestOptions
Options for pending requests (bits to be ORed).
GNUNET_PEER_Id sender_pid
Identity of the peer that we should use for the &#39;sender&#39; (recipient of the response) when forwarding ...
#define DATASTORE_LOAD_AUTODECLINE
At what frequency should our datastore load decrease automatically (since if we don&#39;t use it...
Each peer along the way should look at &#39;enc&#39; (otherwise only the k-peers closest to the key should lo...
No good quality of the operation is needed (i.e., random numbers can be pseudo-random).
#define GNUNET_malloc(size)
Wrapper around malloc.
struct GNUNET_HashCode * replies_seen
Array of hash codes of replies we&#39;ve already seen.
static int active_to_migration
Are we allowed to migrate content to this peer.
#define GNUNET_free(ptr)
Wrapper around free.
static void call_continuation(struct GSF_PendingRequest *pr)
Time for relative time used by GNUnet, in microseconds.
void GNUNET_BLOCK_group_destroy(struct GNUNET_BLOCK_Group *bg)
Destroy resources used by a block group.
Definition: block.c:205
int GNUNET_BLOCK_group_set_seen(struct GNUNET_BLOCK_Group *bg, const struct GNUNET_HashCode *seen_results, unsigned int seen_results_count)
Update block group to filter out the given results.
Definition: block.c:409
#define gettext_noop(String)
Definition: gettext.h:69
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:966