GNUnet  0.10.x
gnunet-service-fs_pr.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2009-2013 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19 */
20 
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_load_lib.h"
29 #include "gnunet-service-fs.h"
30 #include "gnunet-service-fs_cp.h"
32 #include "gnunet-service-fs_pe.h"
33 #include "gnunet-service-fs_pr.h"
35 
36 
40 #define DHT_GET_REPLICATION 5
41 
47 #define MAX_DATASTORE_QUEUE (16 * MAX_QUEUE_PER_PEER)
48 
55 #define CONTENT_BANDWIDTH_VALUE 800
56 
60 #define MAX_RESULTS (100 * 1024)
61 
65 #define INSANE_STATISTICS GNUNET_NO
66 
71 #define CADET_RETRY_MAX 3
72 
73 
78 {
83 
88 
92  void *rh_cls;
93 
98 
103 
108 
113 
118 
123 
129 
134 
139 
145 
151 
156 
161 
166 
170  bool seen_null;
171 
176  uint64_t first_uid;
177 
181  size_t result_count;
182 
187  unsigned int cadet_retry_count;
188 
192  unsigned int replies_seen_count;
193 
197  unsigned int replies_seen_size;
198 };
199 
200 
206 
207 
212 
213 
218 
219 
229 
230 
236 static unsigned long long max_pending_requests = (32 * 1024);
237 
238 
249 static void
251 {
252  if (NULL != pr->bg)
253  {
255  pr->bg = NULL;
256  }
257  if (GNUNET_BLOCK_TYPE_FS_UBLOCK != type)
258  return; /* no need */
259  pr->bg =
261  type,
263  UINT32_MAX),
264  NULL,
265  0,
266  "seen-set-size",
267  pr->replies_seen_count,
268  NULL);
269  if (NULL == pr->bg)
270  return;
273  pr->replies_seen,
274  pr->replies_seen_count));
275 }
276 
277 
299 struct GSF_PendingRequest *
301  enum GNUNET_BLOCK_Type type,
302  const struct GNUNET_HashCode *query,
303  const struct GNUNET_PeerIdentity *target,
304  const char *bf_data,
305  size_t bf_size,
306  uint32_t mingle,
307  uint32_t anonymity_level,
308  uint32_t priority,
309  int32_t ttl,
312  const struct GNUNET_HashCode *replies_seen,
313  unsigned int replies_seen_count,
315  void *rh_cls)
316 {
317  struct GSF_PendingRequest *pr;
318  struct GSF_PendingRequest *dpr;
319  size_t extra;
320  struct GNUNET_HashCode *eptr;
321 
323  "Creating request handle for `%s' of type %d\n",
324  GNUNET_h2s (query),
325  type);
326 #if INSANE_STATISTICS
328  gettext_noop ("# Pending requests created"),
329  1,
330  GNUNET_NO);
331 #endif
332  extra = 0;
333  if (NULL != target)
334  extra += sizeof (struct GNUNET_PeerIdentity);
335  pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest) + extra);
336  pr->public_data.query = *query;
337  eptr = (struct GNUNET_HashCode *) &pr[1];
338  if (NULL != target)
339  {
340  pr->public_data.target = (struct GNUNET_PeerIdentity *) eptr;
341  GNUNET_memcpy (eptr, target, sizeof (struct GNUNET_PeerIdentity));
342  }
344  pr->public_data.priority = priority;
345  pr->public_data.original_priority = priority;
347  pr->public_data.type = type;
349  pr->sender_pid = sender_pid;
350  pr->origin_pid = origin_pid;
351  pr->rh = rh;
352  pr->rh_cls = rh_cls;
353  GNUNET_assert ((sender_pid != 0) || (0 == (options & GSF_PRO_FORWARD_ONLY)));
354  if (ttl >= 0)
357  else
361  (uint32_t) (-ttl)));
362  if (replies_seen_count > 0)
363  {
365  pr->replies_seen =
368  replies_seen,
369  replies_seen_count * sizeof (struct GNUNET_HashCode));
371  }
372  if ((NULL != bf_data) &&
374  {
376  pr->public_data.type,
377  mingle,
378  bf_data,
379  bf_size,
380  "seen-set-size",
381  0,
382  NULL);
383  }
384  else if ((replies_seen_count > 0) &&
385  (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)))
386  {
388  }
390  &pr->public_data.query,
391  pr,
393  if (0 == (options & GSF_PRO_REQUEST_NEVER_EXPIRES))
394  {
395  pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
396  pr,
398  /* make sure we don't track too many requests */
399  while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) >
401  {
402  dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
403  GNUNET_assert (NULL != dpr);
404  if (pr == dpr)
405  break; /* let the request live briefly... */
406  if (NULL != dpr->rh)
407  dpr->rh (dpr->rh_cls,
409  dpr,
410  UINT32_MAX,
414  NULL,
415  0);
417  }
418  }
420  gettext_noop ("# Pending requests active"),
421  1,
422  GNUNET_NO);
423  return pr;
424 }
425 
432 struct GSF_PendingRequestData *
434 {
435  return &pr->public_data;
436 }
437 
438 
448 int
450  struct GSF_PendingRequest *prb)
451 {
452  if ((pra->public_data.type != prb->public_data.type) ||
453  (0 != memcmp (&pra->public_data.query,
454  &prb->public_data.query,
455  sizeof (struct GNUNET_HashCode))))
456  return GNUNET_NO;
457  return GNUNET_OK;
458 }
459 
460 
469 void
471  const struct GNUNET_HashCode *replies_seen,
472  unsigned int replies_seen_count)
473 {
474  if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
475  return; /* integer overflow */
477  {
478  /* we're responsible for the BF, full refresh */
479  if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
481  pr->replies_seen_size,
482  replies_seen_count + pr->replies_seen_count);
484  replies_seen,
485  sizeof (struct GNUNET_HashCode) * replies_seen_count);
488  }
489  else
490  {
491  if (NULL == pr->bg)
492  {
493  /* we're not the initiator, but the initiator did not give us
494  * any bloom-filter, so we need to create one on-the-fly */
496  }
497  else
498  {
501  replies_seen,
502  pr->replies_seen_count));
503  }
504  }
505  if (NULL != pr->gh)
507  replies_seen_count,
508  replies_seen);
509 }
510 
511 
519 struct GNUNET_MQ_Envelope *
521 {
522  struct GNUNET_MQ_Envelope *env;
523  struct GetMessage *gm;
524  struct GNUNET_PeerIdentity *ext;
525  unsigned int k;
526  uint32_t bm;
527  uint32_t prio;
528  size_t bf_size;
529  struct GNUNET_TIME_Absolute now;
530  int64_t ttl;
531  int do_route;
532  void *bf_data;
533  uint32_t bf_nonce;
534 
536  "Building request message for `%s' of type %d\n",
538  pr->public_data.type);
539  k = 0;
540  bm = 0;
541  do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
542  if ((! do_route) && (pr->sender_pid == 0))
543  {
544  GNUNET_break (0);
545  do_route = GNUNET_YES;
546  }
547  if (! do_route)
548  {
550  k++;
551  }
552  if (NULL != pr->public_data.target)
553  {
555  k++;
556  }
557  if (GNUNET_OK !=
558  GNUNET_BLOCK_group_serialize (pr->bg, &bf_nonce, &bf_data, &bf_size))
559  {
560  bf_size = 0;
561  bf_data = NULL;
562  }
563  env = GNUNET_MQ_msg_extra (gm,
564  bf_size + k * sizeof (struct GNUNET_PeerIdentity),
566  gm->type = htonl (pr->public_data.type);
567  if (do_route)
569  pr->public_data.priority + 1);
570  else
571  prio = 0;
572  pr->public_data.priority -= prio;
574  pr->public_data.respect_offered += prio;
575  gm->priority = htonl (prio);
576  now = GNUNET_TIME_absolute_get ();
577  ttl = (int64_t) (pr->public_data.ttl.abs_value_us - now.abs_value_us);
578  gm->ttl = htonl (ttl / 1000LL / 1000LL);
579  gm->filter_mutator = htonl (bf_nonce);
580  gm->hash_bitmap = htonl (bm);
581  gm->query = pr->public_data.query;
582  ext = (struct GNUNET_PeerIdentity *) &gm[1];
583  k = 0;
584  if (! do_route)
585  GNUNET_PEER_resolve (pr->sender_pid, &ext[k++]);
586  if (NULL != pr->public_data.target)
587  ext[k++] = *pr->public_data.target;
588  GNUNET_memcpy (&ext[k], bf_data, bf_size);
589  GNUNET_free_non_null (bf_data);
590  return env;
591 }
592 
593 
602 static int
603 clean_request (void *cls, const struct GNUNET_HashCode *key, void *value)
604 {
605  struct GSF_PendingRequest *pr = value;
607 
609  "Cleaning up pending request for `%s'.\n",
610  GNUNET_h2s (key));
611  if (NULL != pr->cadet_request)
612  {
615  pr->cadet_request = NULL;
616  }
617  if (NULL != (cont = pr->llc_cont))
618  {
619  pr->llc_cont = NULL;
620  cont (pr->llc_cont_cls, pr, pr->local_result);
621  }
625  pr->bg = NULL;
627  pr->sender_pid = 0;
629  pr->origin_pid = 0;
630  if (NULL != pr->hnode)
631  {
633  pr->hnode = NULL;
634  }
635  if (NULL != pr->qe)
636  {
638  pr->qe = NULL;
639  }
640  if (NULL != pr->gh)
641  {
642  GNUNET_DHT_get_stop (pr->gh);
643  pr->gh = NULL;
644  }
645  if (NULL != pr->warn_task)
646  {
648  pr->warn_task = NULL;
649  }
650  GNUNET_assert (
651  GNUNET_OK ==
654  gettext_noop ("# Pending requests active"),
655  -1,
656  GNUNET_NO);
657  GNUNET_free (pr);
658  return GNUNET_YES;
659 }
660 
661 
668 void
669 GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup)
670 {
672 
673  if (NULL == pr_map)
674  return; /* already cleaned up! */
675  if (GNUNET_NO == full_cleanup)
676  {
677  /* make request inactive (we're no longer interested in more results),
678  * but do NOT remove from our data-structures, we still need it there
679  * to prevent the request from looping */
680  pr->rh = NULL;
681  if (NULL != pr->cadet_request)
682  {
685  pr->cadet_request = NULL;
686  }
687  if (NULL != (cont = pr->llc_cont))
688  {
689  pr->llc_cont = NULL;
690  cont (pr->llc_cont_cls, pr, pr->local_result);
691  }
693  if (NULL != pr->qe)
694  {
696  pr->qe = NULL;
697  }
698  if (NULL != pr->gh)
699  {
700  GNUNET_DHT_get_stop (pr->gh);
701  pr->gh = NULL;
702  }
703  if (NULL != pr->warn_task)
704  {
706  pr->warn_task = NULL;
707  }
708  return;
709  }
711  clean_request (NULL, &pr->public_data.query, pr));
712 }
713 
714 
721 void
723 {
725  pr_map,
727  cls);
728 }
729 
730 
735 {
739  const void *data;
740 
745 
750 
754  size_t size;
755 
760 
765 
769  uint32_t priority;
770 
774  uint32_t anonymity_level;
775 
780 
785 };
786 
787 
795 static void
797  struct GSF_PendingRequest *pr)
798 {
799  if (prq->sender == NULL)
800  return;
803  prq->priority);
804 }
805 
806 
815 static int
816 process_reply (void *cls, const struct GNUNET_HashCode *key, void *value)
817 {
818  struct ProcessReplyClosure *prq = cls;
819  struct GSF_PendingRequest *pr = value;
820  struct GNUNET_HashCode chash;
821  struct GNUNET_TIME_Absolute last_transmission;
822 
823  if (NULL == pr->rh)
824  return GNUNET_YES;
826  "Matched result (type %u) for query `%s' with pending request\n",
827  (unsigned int) prq->type,
828  GNUNET_h2s (key));
830  gettext_noop ("# replies received and matched"),
831  1,
832  GNUNET_NO);
834  prq->type,
835  pr->bg,
836  prq->eo,
837  key,
838  NULL,
839  0,
840  prq->data,
841  prq->size);
842  switch (prq->eval)
843  {
846  break;
848  /* short cut: stop processing early, no BF-update, etc. */
853  .rel_value_us);
854  if (GNUNET_YES !=
856  .pr_head,
857  prq->sender,
858  &last_transmission))
859  last_transmission.abs_value_us =
860  GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us;
861  /* pass on to other peers / local clients */
862  pr->rh (pr->rh_cls,
863  prq->eval,
864  pr,
865  prq->anonymity_level,
866  prq->expiration,
867  last_transmission,
868  prq->type,
869  prq->data,
870  prq->size);
871  return GNUNET_YES;
873 #if INSANE_STATISTICS
875  gettext_noop (
876  "# duplicate replies discarded (bloomfilter)"),
877  1,
878  GNUNET_NO);
879 #endif
880  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Duplicate response, discarding.\n");
881  return GNUNET_YES; /* duplicate */
884  gettext_noop ("# irrelevant replies discarded"),
885  1,
886  GNUNET_NO);
887  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Irrelevant response, ignoring.\n");
888  return GNUNET_YES;
890  return GNUNET_YES; /* wrong namespace */
892  GNUNET_break (0);
893  return GNUNET_YES;
895  GNUNET_break (0);
896  return GNUNET_YES;
899  _ ("Unsupported block type %u\n"),
900  prq->type);
901  return GNUNET_NO;
902  }
903  /* update bloomfilter */
904  GNUNET_CRYPTO_hash (prq->data, prq->size, &chash);
905  GSF_pending_request_update_ (pr, &chash, 1);
906  if (NULL == prq->sender)
907  {
909  "Found result for query `%s' in local datastore\n",
910  GNUNET_h2s (key));
912  gettext_noop ("# results found locally"),
913  1,
914  GNUNET_NO);
915  }
916  else
917  {
918  GSF_dht_lookup_ (pr);
919  }
921  pr->public_data.priority = 0;
924  prq->request_found = GNUNET_YES;
925  /* finally, pass on to other peer / local client */
927  .pr_head,
928  prq->sender,
929  &last_transmission))
930  last_transmission.abs_value_us = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us;
931  pr->rh (pr->rh_cls,
932  prq->eval,
933  pr,
934  prq->anonymity_level,
935  prq->expiration,
936  last_transmission,
937  prq->type,
938  prq->data,
939  prq->size);
940  return GNUNET_YES;
941 }
942 
943 
948 {
949 
954 
958  struct GNUNET_PeerIdentity origin;
959 
965 };
966 
967 
977 static void
979  int success,
981  const char *msg)
982 {
983  struct PutMigrationContext *pmc = cls;
984  struct GSF_ConnectedPeer *cp;
985  struct GNUNET_TIME_Relative mig_pause;
986  struct GSF_PeerPerformanceData *ppd;
987 
988  if (NULL != datastore_put_load)
989  {
990  if (GNUNET_SYSERR != success)
991  {
992  GNUNET_LOAD_update (datastore_put_load,
994  .rel_value_us);
995  }
996  else
997  {
998  /* on queue failure / timeout, increase the put load dramatically */
999  GNUNET_LOAD_update (datastore_put_load,
1000  GNUNET_TIME_UNIT_MINUTES.rel_value_us);
1001  }
1002  }
1003  cp = GSF_peer_get_ (&pmc->origin);
1004  if (GNUNET_OK == success)
1005  {
1006  if (NULL != cp)
1007  {
1008  ppd = GSF_get_peer_performance_data_ (cp);
1009  ppd->migration_delay.rel_value_us /= 2;
1010  }
1011  GNUNET_free (pmc);
1012  return;
1013  }
1014  if ((GNUNET_NO == success) && (GNUNET_NO == pmc->requested) && (NULL != cp))
1015  {
1016  ppd = GSF_get_peer_performance_data_ (cp);
1017  if (min_expiration.abs_value_us > 0)
1018  {
1020  "Asking to stop migration for %s because datastore is full\n",
1022  GNUNET_TIME_absolute_get_remaining (min_expiration),
1023  GNUNET_YES));
1024  GSF_block_peer_migration_ (cp, min_expiration);
1025  }
1026  else
1027  {
1029  ppd->migration_delay);
1030  ppd->migration_delay =
1032  mig_pause.rel_value_us =
1035  ppd->migration_delay =
1037  GNUNET_log (
1039  "Replicated content already exists locally, asking to stop migration for %s\n",
1042  GNUNET_TIME_relative_to_absolute (mig_pause));
1043  }
1044  }
1045  GNUNET_free (pmc);
1047  gettext_noop ("# Datastore `PUT' failures"),
1048  1,
1049  GNUNET_NO);
1050 }
1051 
1052 
1062 static int
1063 test_put_load_too_high (uint32_t priority)
1064 {
1065  double ld;
1066 
1067  if (NULL == datastore_put_load)
1068  return GNUNET_NO;
1069  if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
1070  return GNUNET_NO; /* very fast */
1071  ld = GNUNET_LOAD_get_load (datastore_put_load);
1072  if (ld < 2.0 * (1 + priority))
1073  return GNUNET_NO;
1075  gettext_noop (
1076  "# storage requests dropped due to high load"),
1077  1,
1078  GNUNET_NO);
1079  return GNUNET_YES;
1080 }
1081 
1082 
1098 static void
1099 handle_dht_reply (void *cls,
1100  struct GNUNET_TIME_Absolute exp,
1101  const struct GNUNET_HashCode *key,
1102  const struct GNUNET_PeerIdentity *get_path,
1103  unsigned int get_path_length,
1104  const struct GNUNET_PeerIdentity *put_path,
1105  unsigned int put_path_length,
1106  enum GNUNET_BLOCK_Type type,
1107  size_t size,
1108  const void *data)
1109 {
1110  struct GSF_PendingRequest *pr = cls;
1111  struct ProcessReplyClosure prq;
1112  struct PutMigrationContext *pmc;
1113 
1115  gettext_noop ("# Replies received from DHT"),
1116  1,
1117  GNUNET_NO);
1118  memset (&prq, 0, sizeof (prq));
1119  prq.data = data;
1120  prq.expiration = exp;
1121  /* do not allow migrated content to live longer than 1 year */
1124  prq.expiration);
1125  prq.size = size;
1126  prq.type = type;
1127  prq.eo = GNUNET_BLOCK_EO_NONE;
1128  process_reply (&prq, key, pr);
1129  if ((GNUNET_YES == active_to_migration) &&
1131  {
1133  "Replicating result for query `%s' with priority %u\n",
1134  GNUNET_h2s (key),
1135  prq.priority);
1136  pmc = GNUNET_new (struct PutMigrationContext);
1137  pmc->start = GNUNET_TIME_absolute_get ();
1138  pmc->requested = GNUNET_YES;
1139  if (NULL == GNUNET_DATASTORE_put (GSF_dsh,
1140  0,
1141  key,
1142  size,
1143  data,
1144  type,
1145  prq.priority,
1146  1 /* anonymity */,
1147  0 /* replication */,
1148  exp,
1149  1 + prq.priority,
1152  pmc))
1153  {
1155  GNUNET_SYSERR,
1157  NULL);
1158  }
1159  }
1160 }
1161 
1162 
1168 void
1170 {
1171  const void *xquery;
1172  size_t xquery_size;
1173  struct GNUNET_PeerIdentity pi;
1174  char buf[sizeof (struct GNUNET_HashCode) * 2] GNUNET_ALIGN;
1175 
1176  if (0 != pr->public_data.anonymity_level)
1177  return;
1178  if (NULL != pr->gh)
1179  {
1180  GNUNET_DHT_get_stop (pr->gh);
1181  pr->gh = NULL;
1182  }
1183  xquery = NULL;
1184  xquery_size = 0;
1185  if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
1186  {
1187  GNUNET_assert (0 != pr->sender_pid);
1188  GNUNET_PEER_resolve (pr->sender_pid, &pi);
1189  GNUNET_memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity));
1190  xquery_size += sizeof (struct GNUNET_PeerIdentity);
1191  }
1193  pr->public_data.type,
1194  &pr->public_data.query,
1197  xquery,
1198  xquery_size,
1200  pr);
1201  if ((NULL != pr->gh) && (0 != pr->replies_seen_count))
1203  pr->replies_seen_count,
1204  pr->replies_seen);
1205 }
1206 
1207 
1217 static void
1218 cadet_reply_proc (void *cls,
1219  enum GNUNET_BLOCK_Type type,
1221  size_t data_size,
1222  const void *data)
1223 {
1224  struct GSF_PendingRequest *pr = cls;
1225  struct ProcessReplyClosure prq;
1226  struct GNUNET_HashCode query;
1227 
1228  pr->cadet_request = NULL;
1229  if (GNUNET_BLOCK_TYPE_ANY == type)
1230  {
1231  GNUNET_break (NULL == data);
1232  GNUNET_break (0 == data_size);
1233  pr->cadet_retry_count++;
1235  return; /* give up on cadet */
1236  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Error retrieiving block via cadet\n");
1237  /* retry -- without delay, as this is non-anonymous
1238  and cadet/cadet connect will take some time anyway */
1240  &pr->public_data.query,
1241  pr->public_data.type,
1243  pr);
1244  return;
1245  }
1246  if (GNUNET_YES !=
1247  GNUNET_BLOCK_get_key (GSF_block_ctx, type, data, data_size, &query))
1248  {
1250  "Failed to derive key for block of type %d\n",
1251  (int) type);
1252  GNUNET_break_op (0);
1253  return;
1254  }
1256  gettext_noop ("# Replies received from CADET"),
1257  1,
1258  GNUNET_NO);
1259  memset (&prq, 0, sizeof (prq));
1260  prq.data = data;
1261  prq.expiration = expiration;
1262  /* do not allow migrated content to live longer than 1 year */
1265  prq.expiration);
1266  prq.size = data_size;
1267  prq.type = type;
1268  prq.eo = GNUNET_BLOCK_EO_NONE;
1269  process_reply (&prq, &query, pr);
1270 }
1271 
1272 
1278 void
1280 {
1281  if (0 != pr->public_data.anonymity_level)
1282  return;
1283  if (0 == pr->public_data.target)
1284  {
1286  "Cannot do cadet-based download, target peer not known\n");
1287  return;
1288  }
1289  if (NULL != pr->cadet_request)
1290  return;
1292  &pr->public_data.query,
1293  pr->public_data.type,
1295  pr);
1296 }
1297 
1298 
1304 static void
1305 warn_delay_task (void *cls)
1306 {
1307  struct GSF_PendingRequest *pr = cls;
1308 
1310  _ ("Datastore lookup already took %s!\n"),
1313  GNUNET_YES));
1315  &warn_delay_task,
1316  pr);
1317 }
1318 
1319 
1325 static void
1327 {
1328  struct GSF_PendingRequest *pr = cls;
1329 
1331  _ ("On-demand lookup already took %s!\n"),
1334  GNUNET_YES));
1337  pr);
1338 }
1339 
1340 
1341 /* Call our continuation (if we have any) */
1342 static void
1344 {
1346 
1347  GNUNET_assert (NULL == pr->qe);
1348  if (NULL != pr->warn_task)
1349  {
1351  pr->warn_task = NULL;
1352  }
1353  if (NULL == cont)
1354  return; /* no continuation */
1355  pr->llc_cont = NULL;
1356  if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options))
1357  {
1359  {
1360  /* Signal that we are done and that there won't be any
1361  additional results to allow client to clean up state. */
1362  pr->rh (pr->rh_cls,
1364  pr,
1365  UINT32_MAX,
1369  NULL,
1370  0);
1371  }
1372  /* Finally, call our continuation to signal that we are
1373  done with local processing of this request; i.e. to
1374  start reading again from the client. */
1376  return;
1377  }
1378 
1379  cont (pr->llc_cont_cls, pr, pr->local_result);
1380 }
1381 
1382 
1383 /* Update stats and call continuation */
1384 static void
1386 {
1388  "No further local responses available.\n");
1389 #if INSANE_STATISTICS
1393  gettext_noop (
1394  "# requested DBLOCK or IBLOCK not found"),
1395  1,
1396  GNUNET_NO);
1397 #endif
1398  call_continuation (pr);
1399 }
1400 
1401 
1402 /* forward declaration */
1403 static void
1404 process_local_reply (void *cls,
1405  const struct GNUNET_HashCode *key,
1406  size_t size,
1407  const void *data,
1408  enum GNUNET_BLOCK_Type type,
1409  uint32_t priority,
1410  uint32_t anonymity,
1411  uint32_t replication,
1413  uint64_t uid);
1414 
1415 
1416 /* Start a local query */
1417 static void
1419  uint64_t next_uid,
1420  bool random)
1421 {
1424  &warn_delay_task,
1425  pr);
1427  next_uid,
1428  random,
1429  &pr->public_data.query,
1430  pr->public_data.type ==
1433  : pr->public_data.type,
1435  pr->public_data.options))
1436  ? UINT_MAX
1437  : 1
1438  /* queue priority */,
1440  pr->public_data.options))
1441  ? UINT_MAX
1443  /* max queue size */,
1445  pr);
1446  if (NULL != pr->qe)
1447  return;
1448  GNUNET_log (
1450  "ERROR Requesting `%s' of type %d with next_uid %llu from datastore.\n",
1451  GNUNET_h2s (&pr->public_data.query),
1452  pr->public_data.type,
1453  (unsigned long long) next_uid);
1455  gettext_noop (
1456  "# Datastore lookups concluded (error queueing)"),
1457  1,
1458  GNUNET_NO);
1459  call_continuation (pr);
1460 }
1461 
1462 
1481 static void
1483  const struct GNUNET_HashCode *key,
1484  size_t size,
1485  const void *data,
1486  enum GNUNET_BLOCK_Type type,
1487  uint32_t priority,
1488  uint32_t anonymity,
1489  uint32_t replication,
1490  struct GNUNET_TIME_Absolute expiration,
1491  uint64_t uid)
1492 {
1493  struct GSF_PendingRequest *pr = cls;
1494  struct ProcessReplyClosure prq;
1495  struct GNUNET_HashCode query;
1496  unsigned int old_rf;
1497 
1499  pr->warn_task = NULL;
1500  if (NULL == pr->qe)
1501  goto called_from_on_demand;
1502  pr->qe = NULL;
1503  if (
1504  (NULL == key) && pr->seen_null &&
1505  ! pr->have_first_uid) /* We have hit the end for the 2nd time with no results */
1506  {
1507  /* No results */
1508 #if INSANE_STATISTICS
1510  gettext_noop (
1511  "# Datastore lookups concluded (no results)"),
1512  1,
1513  GNUNET_NO);
1514 #endif
1515  no_more_local_results (pr);
1516  return;
1517  }
1518  if (((NULL == key) &&
1519  pr->seen_null) || /* We have hit the end for the 2nd time OR */
1520  (pr->seen_null && pr->have_first_uid &&
1521  (uid >= pr->first_uid))) /* We have hit the end and past first UID */
1522  {
1523  /* Seen all results */
1525  gettext_noop (
1526  "# Datastore lookups concluded (seen all)"),
1527  1,
1528  GNUNET_NO);
1529  no_more_local_results (pr);
1530  return;
1531  }
1532  if (NULL == key)
1533  {
1534  GNUNET_assert (! pr->seen_null);
1535  pr->seen_null = true;
1536  start_local_query (pr, 0 /* next_uid */, false /* random */);
1537  return;
1538  }
1539  if (! pr->have_first_uid)
1540  {
1541  pr->first_uid = uid;
1542  pr->have_first_uid = true;
1543  }
1544  pr->result_count++;
1545  if (pr->result_count > MAX_RESULTS)
1546  {
1548  GSF_stats,
1549  gettext_noop ("# Datastore lookups aborted (more than MAX_RESULTS)"),
1550  1,
1551  GNUNET_NO);
1552  no_more_local_results (pr);
1553  return;
1554  }
1556  "Received reply for `%s' of type %d with UID %llu from datastore.\n",
1557  GNUNET_h2s (key),
1558  type,
1559  (unsigned long long) uid);
1560  if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1561  {
1563  "Found ONDEMAND block, performing on-demand encoding\n");
1565  gettext_noop (
1566  "# on-demand blocks matched requests"),
1567  1,
1568  GNUNET_NO);
1572  pr);
1574  size,
1575  data,
1576  type,
1577  priority,
1578  anonymity,
1579  replication,
1580  expiration,
1581  uid,
1583  pr))
1584  {
1586  gettext_noop (
1587  "# on-demand lookups performed successfully"),
1588  1,
1589  GNUNET_NO);
1590  return; /* we're done */
1591  }
1593  gettext_noop ("# on-demand lookups failed"),
1594  1,
1595  GNUNET_NO);
1597  start_local_query (pr, uid + 1 /* next_uid */, false /* random */);
1598  return;
1599  }
1600 called_from_on_demand:
1601  old_rf = pr->public_data.results_found;
1602  memset (&prq, 0, sizeof (prq));
1603  prq.data = data;
1604  prq.expiration = expiration;
1605  prq.size = size;
1606  if (GNUNET_OK !=
1607  GNUNET_BLOCK_get_key (GSF_block_ctx, type, data, size, &query))
1608  {
1609  GNUNET_break (0);
1611  key,
1612  size,
1613  data,
1614  UINT_MAX,
1615  UINT_MAX,
1616  NULL,
1617  NULL);
1618  start_local_query (pr, uid + 1 /* next_uid */, false /* random */);
1619  return;
1620  }
1621  prq.type = type;
1622  prq.priority = priority;
1623  prq.request_found = GNUNET_NO;
1624  prq.anonymity_level = anonymity;
1625  if ((0 == old_rf) && (0 == pr->public_data.results_found))
1628  process_reply (&prq, key, pr);
1629  pr->local_result = prq.eval;
1631  {
1633  GSF_stats,
1634  gettext_noop ("# Datastore lookups concluded (found last result)"),
1635  1,
1636  GNUNET_NO);
1637  call_continuation (pr);
1638  return;
1639  }
1640  if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
1642  (pr->public_data.results_found > 5 + 2 * pr->public_data.priority)))
1643  {
1644  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Load too high, done with request\n");
1646  gettext_noop (
1647  "# Datastore lookups concluded (load too high)"),
1648  1,
1649  GNUNET_NO);
1650  call_continuation (pr);
1651  return;
1652  }
1653  start_local_query (pr, uid + 1 /* next_uid */, false /* random */);
1654 }
1655 
1656 
1664 int
1666  const struct GNUNET_PeerIdentity *target)
1667 {
1668  struct GNUNET_PeerIdentity pi;
1669 
1670  if (0 == pr->origin_pid)
1671  return GNUNET_YES;
1672  GNUNET_PEER_resolve (pr->origin_pid, &pi);
1673  return (0 == memcmp (&pi, target, sizeof (struct GNUNET_PeerIdentity)))
1674  ? GNUNET_NO
1675  : GNUNET_YES;
1676 }
1677 
1678 
1686 void
1689  void *cont_cls)
1690 {
1691  GNUNET_assert (NULL == pr->gh);
1692  GNUNET_assert (NULL == pr->cadet_request);
1693  GNUNET_assert (NULL == pr->llc_cont);
1694  pr->llc_cont = cont;
1695  pr->llc_cont_cls = cont_cls;
1696 #if INSANE_STATISTICS
1698  gettext_noop ("# Datastore lookups initiated"),
1699  1,
1700  GNUNET_NO);
1701 #endif
1702  start_local_query (pr, 0 /* next_uid */, true /* random */);
1703 }
1704 
1705 
1715 void
1716 handle_p2p_put (void *cls, const struct PutMessage *put)
1717 {
1718  struct GSF_ConnectedPeer *cp = cls;
1719  uint16_t msize;
1720  size_t dsize;
1721  enum GNUNET_BLOCK_Type type;
1722  struct GNUNET_TIME_Absolute expiration;
1723  struct GNUNET_HashCode query;
1724  struct ProcessReplyClosure prq;
1725  struct GNUNET_TIME_Relative block_time;
1726  double putl;
1727  struct PutMigrationContext *pmc;
1728 
1730  "Received P2P PUT from %s\n",
1733  msize = ntohs (put->header.size);
1734  dsize = msize - sizeof (struct PutMessage);
1735  type = ntohl (put->type);
1736  expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
1737  /* do not allow migrated content to live longer than 1 year */
1740  expiration);
1741  if (GNUNET_OK !=
1742  GNUNET_BLOCK_get_key (GSF_block_ctx, type, &put[1], dsize, &query))
1743  {
1744  GNUNET_break_op (0);
1745  return;
1746  }
1748  gettext_noop ("# GAP PUT messages received"),
1749  1,
1750  GNUNET_NO);
1751  /* now, lookup 'query' */
1752  prq.data = (const void *) &put[1];
1753  prq.sender = cp;
1754  prq.size = dsize;
1755  prq.type = type;
1756  prq.expiration = expiration;
1757  prq.priority = 0;
1758  prq.anonymity_level = UINT32_MAX;
1759  prq.request_found = GNUNET_NO;
1760  prq.eo = GNUNET_BLOCK_EO_NONE;
1762  &query,
1763  &process_reply,
1764  &prq);
1765  if (NULL != cp)
1766  {
1769  1000 * prq.priority);
1771  }
1772  if ((GNUNET_YES == active_to_migration) && (NULL != cp) &&
1774  {
1776  "Replicating result for query `%s' with priority %u\n",
1777  GNUNET_h2s (&query),
1778  prq.priority);
1779  pmc = GNUNET_new (struct PutMigrationContext);
1780  pmc->start = GNUNET_TIME_absolute_get ();
1781  pmc->requested = prq.request_found;
1784  &pmc->origin);
1785  if (NULL == GNUNET_DATASTORE_put (GSF_dsh,
1786  0,
1787  &query,
1788  dsize,
1789  &put[1],
1790  type,
1791  prq.priority,
1792  1 /* anonymity */,
1793  0 /* replication */,
1794  expiration,
1795  1 + prq.priority,
1798  pmc))
1799  {
1801  GNUNET_SYSERR,
1803  NULL);
1804  }
1805  }
1806  else if (NULL != cp)
1807  {
1809  "Choosing not to keep content `%s' (%d/%d)\n",
1810  GNUNET_h2s (&query),
1813  }
1814  putl = GNUNET_LOAD_get_load (datastore_put_load);
1815  if ((NULL != cp) && (GNUNET_NO == prq.request_found) &&
1817  (putl > 2.5 * (1 + prq.priority))))
1818  {
1821  block_time = GNUNET_TIME_relative_multiply (
1824  (unsigned int) (60000 * putl * putl)));
1825  GNUNET_log (
1827  "Asking to stop migration for %s because of load %f and events %d/%d\n",
1829  putl,
1831  (GNUNET_NO == prq.request_found));
1833  GNUNET_TIME_relative_to_absolute (block_time));
1834  }
1835 }
1836 
1837 
1844 int
1846 {
1847  return (NULL != pr->rh) ? GNUNET_YES : GNUNET_NO;
1848 }
1849 
1850 
1854 void
1856 {
1857  if (GNUNET_OK !=
1859  "fs",
1860  "MAX_PENDING_REQUESTS",
1862  {
1864  "fs",
1865  "MAX_PENDING_REQUESTS");
1866  }
1868  GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING");
1869  datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
1870  pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024, GNUNET_YES);
1871  requests_by_expiration_heap =
1873 }
1874 
1875 
1879 void
1881 {
1884  pr_map = NULL;
1885  GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1886  requests_by_expiration_heap = NULL;
1887  GNUNET_LOAD_value_free (datastore_put_load);
1888  datastore_put_load = NULL;
1889 }
1890 
1891 
1892 /* end of gnunet-service-fs_pr.c */
void GSF_connected_peer_change_preference_(struct GSF_ConnectedPeer *cp, uint64_t pref)
Notify core about a preference we have for the given peer (to allocate more resources towards it)...
Block does not match query (invalid result)
int(* GNUNET_CONTAINER_MulitHashMapIteratorCallback)(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator over hash map entries.
static struct GNUNET_TIME_Absolute min_expiration
Minimum time that content should have to not be discarded instantly (time stamp of any content that w...
API to handle &#39;connected peers&#39;.
struct GSF_ConnectedPeer * sender
Who gave us this reply? NULL for local host (or DHT)
const struct GNUNET_PeerIdentity * target
Identity of a peer hosting the content, otherwise NULl.
static void warn_delay_task(void *cls)
Task that issues a warning if the datastore lookup takes too long.
struct GSF_CadetRequest * GSF_cadet_query(const struct GNUNET_PeerIdentity *target, const struct GNUNET_HashCode *query, enum GNUNET_BLOCK_Type type, GSF_CadetReplyProcessor proc, void *proc_cls)
Look for a block by directly contacting a particular peer.
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_subtract(struct GNUNET_TIME_Absolute start, struct GNUNET_TIME_Relative duration)
Subtract a given relative duration from the given start time.
Definition: time.c:419
const void * data
The data for the reply.
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
static char * expiration
Credential TTL.
#define GNUNET_TIME_UNIT_HOURS
One hour.
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_ntoh(struct GNUNET_TIME_AbsoluteNBO a)
Convert absolute time from network byte order.
Definition: time.c:670
uint32_t priority
How much was this reply worth to us?
#define GNUNET_TIME_UNIT_ZERO_ABS
Absolute time zero.
int GNUNET_CONFIGURATION_get_value_number(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, unsigned long long *number)
Get a configuration value that should be a number.
uint64_t rel_value_us
The actual value.
struct GNUNET_LOAD_Value * GNUNET_LOAD_value_init(struct GNUNET_TIME_Relative autodecline)
Create a new load value.
Definition: load.c:125
Request is allowed to refresh bloomfilter and change mingle value.
int GSF_request_plan_reference_get_last_transmission_(struct GSF_PendingRequestPlanBijection *pr_head, struct GSF_ConnectedPeer *sender, struct GNUNET_TIME_Absolute *result)
Get the last transmission attempt time for the request plan list referenced by pr_head, that was sent to sender.
void GSF_update_datastore_delay_(struct GNUNET_TIME_Absolute start)
We&#39;ve just now completed a datastore request.
Any type of block, used as a wildcard when searching.
Response from FS service with a result for a previous FS search.
Definition: fs.h:336
struct GNUNET_CONTAINER_HeapNode * GNUNET_CONTAINER_heap_insert(struct GNUNET_CONTAINER_Heap *heap, void *element, GNUNET_CONTAINER_HeapCostType cost)
Inserts a new element into the heap.
struct GNUNET_GETOPT_CommandLineOption options[]
Definition: 002.c:5
struct GNUNET_TIME_AbsoluteNBO expiration
When does this result expire?
Definition: fs.h:352
uint32_t num_transmissions
Counter for how often this request has been transmitted (estimate, because we might have the same req...
#define GET_MESSAGE_BIT_TRANSMIT_TO
The peer identity of a peer that had claimed to have the content previously is included (can be used ...
int32_t ttl
Relative time to live in MILLISECONDS (network byte order)
GNUNET_BLOCK_Type
Blocks in the datastore and the datacache must have a unique type.
struct GNUNET_TIME_Absolute expiration
When the reply expires.
uint32_t priority
How important is this request (network byte order)
bool seen_null
Have we seen a NULL result yet?
struct GNUNET_TIME_Absolute ttl
Current TTL for the request.
uint64_t GNUNET_CRYPTO_random_u64(enum GNUNET_CRYPTO_Quality mode, uint64_t max)
Random on unsigned 64-bit values.
#define GNUNET_TIME_UNIT_MINUTES
One minute.
struct GNUNET_TIME_Relative GNUNET_TIME_relative_max(struct GNUNET_TIME_Relative t1, struct GNUNET_TIME_Relative t2)
Return the maximum of two relative time values.
Definition: time.c:286
static void no_more_local_results(struct GSF_PendingRequest *pr)
enum GNUNET_BLOCK_EvaluationOptions eo
Control flags for evaluation.
static int start
Set if we are to start default services (including ARM).
Definition: gnunet-arm.c:39
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_put(struct GNUNET_DATASTORE_Handle *h, uint32_t rid, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Store an item in the datastore.
Closure for process_reply() function.
struct GNUNET_TIME_Absolute GNUNET_TIME_relative_to_absolute(struct GNUNET_TIME_Relative rel)
Convert relative time to an absolute time in the future.
Definition: time.c:245
uint32_t GNUNET_CRYPTO_random_u32(enum GNUNET_CRYPTO_Quality mode, uint32_t i)
Produce a random value.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_TIME_UNIT_SECONDS
One second.
unsigned int results_found
Number of results we have found for this request so far.
static void put_migration_continuation(void *cls, int success, struct GNUNET_TIME_Absolute min_expiration, const char *msg)
Continuation called to notify client about result of the operation.
struct GNUNET_SCHEDULER_Task * warn_task
Task that warns us if the local datastore lookup takes too long.
uint32_t anonymity_level
Desired anonymity level.
double GNUNET_LOAD_get_load(struct GNUNET_LOAD_Value *load)
Get the current load.
Definition: load.c:202
static unsigned int replication
static struct GNUNET_CONTAINER_Heap * requests_by_expiration_heap
Heap with the request that will expire next at the top.
static struct GNUNET_CONTAINER_MultiHashMap * pr_map
All pending requests, ordered by the query.
struct GSF_PendingRequestPlanBijection * pr_head
Fields for the plan module to track a DLL with the request.
struct GSF_PendingRequestData public_data
Public data for the request.
Block does not match xquery (valid result, not relevant for the request)
struct GNUNET_DHT_Handle * GSF_dht
Handle for DHT operations.
bool have_first_uid
Do we have a first UID yet?
Request priority is allowed to be exceeded.
struct GNUNET_STATISTICS_Handle * GSF_stats
Handle for reporting statistics.
#define GNUNET_NO
Definition: gnunet_common.h:81
double GNUNET_LOAD_get_average(struct GNUNET_LOAD_Value *load)
Get the average value given to update so far.
Definition: load.c:217
shared data structures of gnunet-service-fs.c
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:78
#define GNUNET_free_non_null(ptr)
Free the memory pointed to by ptr if ptr is not NULL.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
int GSF_pending_request_is_compatible_(struct GSF_PendingRequest *pra, struct GSF_PendingRequest *prb)
Test if two pending requests are compatible (would generate the same query modulo filters and should ...
Default behavior.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
struct GNUNET_TIME_Absolute start_time
When did we start with the request.
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
static void refresh_bloomfilter(enum GNUNET_BLOCK_Type type, struct GSF_PendingRequest *pr)
Recalculate our bloom filter for filtering replies.
static int test_put_load_too_high(uint32_t priority)
Test if the DATABASE (PUT) load on this peer is too high to even consider processing the query at all...
static int process_reply(void *cls, const struct GNUNET_HashCode *key, void *value)
We have received a reply; handle it!
static void update_request_performance_data(struct ProcessReplyClosure *prq, struct GSF_PendingRequest *pr)
Update the performance data for the sender (if any) since the sender successfully answered one of our...
void GNUNET_PEER_resolve(GNUNET_PEER_Id id, struct GNUNET_PeerIdentity *pid)
Convert an interned PID to a normal peer identity.
Definition: peer.c:226
uint64_t abs_value_us
The actual value.
const struct GNUNET_CONFIGURATION_Handle * GSF_cfg
Our configuration.
void GSF_pending_request_update_(struct GSF_PendingRequest *pr, const struct GNUNET_HashCode *replies_seen, unsigned int replies_seen_count)
Update a given pending request with additional replies that have been seen.
Internal representation of the hash map.
static unsigned int anonymity_level
Anonymity level option to use for publishing.
void GSF_pending_request_init_()
Setup the subsystem.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur...
GSF_LocalLookupContinuation llc_cont
Function to call upon completion of the local get request, or NULL for none.
size_t size
Size of data.
uint32_t filter_mutator
The content hash should be mutated using this value before checking against the bloomfilter (used to ...
Type of a block representing a block to be encoded on demand from disk.
void GSF_peer_update_performance_(struct GSF_ConnectedPeer *cp, struct GNUNET_TIME_Absolute request_time, uint32_t request_priority)
Report on receiving a reply; update the performance record of the given peer.
int GNUNET_BLOCK_get_key(struct GNUNET_BLOCK_Context *ctx, enum GNUNET_BLOCK_Type type, const void *block, size_t block_size, struct GNUNET_HashCode *key)
Function called to obtain the key for a block.
Definition: block.c:379
void GNUNET_LOAD_update(struct GNUNET_LOAD_Value *load, uint64_t data)
Update the current load.
Definition: load.c:238
size_t result_count
Result count.
#define GNUNET_TIME_UNIT_FOREVER_ABS
Constant used to specify "forever".
#define _(String)
GNU gettext support macro.
Definition: platform.h:208
struct GNUNET_HashCode query
Primary query hash for this request.
void(* GSF_LocalLookupContinuation)(void *cls, struct GSF_PendingRequest *pr, enum GNUNET_BLOCK_EvaluationResult result)
Function to be called after we&#39;re done processing replies from the local lookup.
struct GNUNET_TIME_Absolute start
Start time for the operation.
unsigned int replies_seen_count
Number of valid entries in the &#39;replies_seen&#39; array.
Request persists indefinitely (no expiration).
API to handle pending requests.
Public data (in the sense of not encapsulated within &#39;gnunet-service-fs_pr&#39;, not in the sense of netw...
GNUNET_PEER_Id origin_pid
Identity of the peer that we should never forward this query to since it originated this query (0 for...
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct...
Definition: gnunet_mq_lib.h:52
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1246
API to manage query plan.
Entry in our priority queue.
#define GNUNET_array_grow(arr, size, tsize)
Grow a well-typed (!) array.
unsigned int GNUNET_PEER_Id
A GNUNET_PEER_Id is simply a shorter version of a "struct GNUNET_PeerIdentifier" that can be used ins...
#define GNUNET_memcpy(dst, src, n)
static unsigned long long max_pending_requests
Maximum number of requests (from other peers, overall) that we&#39;re willing to have pending at any give...
enum GNUNET_BLOCK_EvaluationResult local_result
Last result from the local datastore lookup evaluation.
void GNUNET_log_config_missing(enum GNUNET_ErrorType kind, const char *section, const char *option)
Log error message about missing configuration option.
static void start_local_query(struct GSF_PendingRequest *pr, uint64_t next_uid, bool random)
static char * value
Value of the record to add/remove.
Valid result, but suppressed because it is a duplicate.
int GSF_pending_request_test_active_(struct GSF_PendingRequest *pr)
Check if the given request is still active.
uint32_t respect
Respect rating for this peer.
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
Message to the datastore service asking about specific content.
Definition: datastore.h:143
void GNUNET_CRYPTO_hash(const void *block, size_t size, struct GNUNET_HashCode *ret)
Compute hash of a given block.
Definition: crypto_hash.c:44
uint32_t hash_bitmap
Which of the optional hash codes are present at the end of the message? See GET_MESSAGE_BIT_xx consta...
void * llc_cont_cls
Closure for llc_cont.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
struct GSF_PeerPerformanceData * GSF_get_peer_performance_data_(struct GSF_ConnectedPeer *cp)
Return the performance data record for the given peer.
unsigned int GSF_datastore_queue_size
Size of the datastore queue we assume for common requests.
enum GNUNET_BLOCK_EvaluationResult GNUNET_BLOCK_evaluate(struct GNUNET_BLOCK_Context *ctx, enum GNUNET_BLOCK_Type type, struct GNUNET_BLOCK_Group *group, enum GNUNET_BLOCK_EvaluationOptions eo, const struct GNUNET_HashCode *query, const void *xquery, size_t xquery_size, const void *reply_block, size_t reply_block_size)
Function called to validate a reply or a request.
Definition: block.c:339
struct GNUNET_BLOCK_Context * GSF_block_ctx
Our block context.
struct GNUNET_HashCode query
Hashcodes of the file(s) we&#39;re looking for.
non-anonymous file-transfer
Inner block in the CHK tree.
void * GNUNET_CONTAINER_heap_peek(const struct GNUNET_CONTAINER_Heap *heap)
Get element stored at the root of heap.
struct GSF_ConnectedPeer * GSF_peer_get_(const struct GNUNET_PeerIdentity *peer)
Get a handle for a connected peer.
const char * GNUNET_STRINGS_relative_time_to_string(struct GNUNET_TIME_Relative delta, int do_round)
Give relative time in human-readable fancy format.
Definition: strings.c:727
#define GET_MESSAGE_BIT_RETURN_TO
The peer identity of a peer waiting for the reply is included (used if the response should be transmi...
Last possible valid result.
static char buf[2048]
#define GNUNET_new_array(n, type)
Allocate a size n array with structs or unions of the given type.
int GNUNET_CONTAINER_multihashmap_remove(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, const void *value)
Remove the given key-value pair from the map.
int(* GSF_PendingRequestIterator)(void *cls, const struct GNUNET_HashCode *key, struct GSF_PendingRequest *pr)
Signature of function called on each request.
void GSF_local_lookup_(struct GSF_PendingRequest *pr, GSF_LocalLookupContinuation cont, void *cont_cls)
Look up the request in the local datastore.
struct GNUNET_TIME_Absolute qe_start
Time we started the last datastore lookup.
void * rh_cls
Closure for rh.
struct GNUNET_TIME_Relative GNUNET_TIME_relative_multiply(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Multiply relative time by a given factor.
Definition: time.c:439
Handle to a node in a heap.
uint32_t respect_offered
How much respect did we (in total) offer for this request so far (estimate, because we might have the...
#define CONTENT_BANDWIDTH_VALUE
Bandwidth value of a 0-priority content (must be fairly high compared to query since content is typic...
enum GNUNET_BLOCK_Type type
Type of the requested block.
int GNUNET_BLOCK_group_serialize(struct GNUNET_BLOCK_Group *bg, uint32_t *nonce, void **raw_data, size_t *raw_data_size)
Serialize state of a block group.
Definition: block.c:181
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_remove(struct GNUNET_DATASTORE_Handle *h, const struct GNUNET_HashCode *key, size_t size, const void *data, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Explicitly remove some content from the database.
Request must only be processed locally.
Heap with the minimum cost at the root.
struct GNUNET_MessageHeader header
Message type will be GNUNET_MESSAGE_TYPE_FS_PUT.
Definition: fs.h:342
void GNUNET_CONTAINER_heap_destroy(struct GNUNET_CONTAINER_Heap *heap)
Destroys the heap.
Type of a block representing any type of search result (universal).
A 512-bit hashcode.
GSF_PendingRequestReplyHandler rh
Function to call if we encounter a reply.
#define GNUNET_TIME_UNIT_MILLISECONDS
One millisecond.
void GNUNET_DHT_get_stop(struct GNUNET_DHT_GetHandle *get_handle)
Stop async DHT-get.
Definition: dht_api.c:1160
uint32_t type
Desired content type.
Definition: datastore.h:153
static void odc_warn_delay_task(void *cls)
Task that issues a warning if the datastore lookup takes too long.
struct GNUNET_DATASTORE_Handle * GSF_dsh
Our connection to the datastore.
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_get(void)
Get the current time.
Definition: time.c:118
struct GNUNET_DHT_GetHandle * gh
DHT request handle for this request (or NULL for none).
The block is obtained from the local database, skip cryptographic checks.
void GSF_plan_notify_request_done_(struct GSF_PendingRequest *pr)
Notify the plan about a request being done; destroy all entries associated with this request...
Node in the heap.
unsigned int GNUNET_CONTAINER_heap_get_size(const struct GNUNET_CONTAINER_Heap *heap)
Get the current size of the heap.
Valid result, and there may be more.
#define CADET_RETRY_MAX
If obtaining a block via cadet fails, how often do we retry it before giving up for good (and stickin...
void GSF_cadet_lookup_(struct GSF_PendingRequest *pr)
Consider downloading via cadet (if possible)
int requested
GNUNET_YES if we had a matching request for this block, GNUNET_NO if not.
unsigned int cadet_retry_count
How often have we retried this request via &#39;cadet&#39;? (used to bound overall retries).
void GSF_pending_request_done_()
Shutdown the subsystem.
Handle for a request that is going out via cadet API.
struct GNUNET_TIME_Relative GNUNET_TIME_relative_min(struct GNUNET_TIME_Relative t1, struct GNUNET_TIME_Relative t2)
Return the minimum of two relative time values.
Definition: time.c:271
struct GNUNET_TESTBED_Peer * peer
The peer associated with this model.
struct GNUNET_HashCode key
The key used in the DHT.
#define GNUNET_SYSERR
Definition: gnunet_common.h:79
static unsigned int size
Size of the "table".
Definition: peer.c:67
#define MAX_RESULTS
Hard limit on the number of results we may get from the datastore per query.
void GNUNET_DHT_get_filter_known_results(struct GNUNET_DHT_GetHandle *get_handle, unsigned int num_results, const struct GNUNET_HashCode *results)
Tell the DHT not to return any of the following known results to this client.
Definition: dht_api.c:1131
int GSF_test_get_load_too_high_(uint32_t priority)
Test if the DATABASE (GET) load on this peer is too high to even consider processing the query at all...
uint32_t original_priority
Priority that this request (originally) had for us.
Values we track for load calculations.
Definition: load.c:35
#define DHT_GET_REPLICATION
Desired replication level for GETs.
GNUNET_BLOCK_EvaluationResult
Possible ways for how a block may relate to a query.
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
void GSF_iterate_pending_requests_(GSF_PendingRequestIterator it, void *cls)
Iterate over all pending requests.
indexing for the file-sharing service
int GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
struct GNUNET_BLOCK_Group * GNUNET_BLOCK_group_create(struct GNUNET_BLOCK_Context *ctx, enum GNUNET_BLOCK_Type type, uint32_t nonce, const void *raw_data, size_t raw_data_size,...)
Create a new block group.
Definition: block.c:290
uint32_t type
Type of the block (in big endian).
Definition: fs.h:347
Handle to a GET request.
Definition: dht_api.c:80
struct GNUNET_CONTAINER_Heap * GNUNET_CONTAINER_heap_create(enum GNUNET_CONTAINER_HeapOrder order)
Create a new heap.
struct GNUNET_MQ_Envelope * GSF_pending_request_get_message_(struct GSF_PendingRequest *pr)
Generate the message corresponding to the given pending request for transmission to other peers...
static int clean_request(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator to free pending requests.
Specified block type not supported by this plugin.
void handle_p2p_put(void *cls, const struct PutMessage *put)
Handle P2P "CONTENT" message.
Allow multiple values with the same key.
unsigned int replies_seen_size
Length of the &#39;replies_seen&#39; array.
int GSF_pending_request_test_target_(struct GSF_PendingRequest *pr, const struct GNUNET_PeerIdentity *target)
Is the given target a legitimate peer for forwarding the given request?
enum GNUNET_BLOCK_Type type
Type of the block.
void GNUNET_DATASTORE_cancel(struct GNUNET_DATASTORE_QueueEntry *qe)
Cancel a datastore operation.
static void cadet_reply_proc(void *cls, enum GNUNET_BLOCK_Type type, struct GNUNET_TIME_Absolute expiration, size_t data_size, const void *data)
Function called with a reply from the cadet.
uint32_t anonymity_level
Anonymity requirements for this reply.
struct GNUNET_DATASTORE_QueueEntry * qe
Datastore queue entry for this request (or NULL for none).
static struct GNUNET_LOAD_Value * datastore_put_load
Datastore &#39;PUT&#39; load tracking.
The identity of the host (wraps the signing key of the peer).
struct GNUNET_LOAD_Value * GSF_rt_entry_lifetime
How long do requests typically stay in the routing table?
struct GSF_PendingRequestData * GSF_pending_request_get_data_(struct GSF_PendingRequest *pr)
Obtain the public data associated with a pending request.
enum GSF_PendingRequestOptions options
Options for the request.
uint64_t first_uid
Unique ID of the first result from the local datastore; used to terminate the loop.
int GNUNET_CONTAINER_multihashmap_get_multiple(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, GNUNET_CONTAINER_MulitHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map that match a particular key.
#define GNUNET_ALIGN
gcc-ism to force alignment; we use this to align char-arrays that may then be cast to &#39;struct&#39;s...
void GSF_cadet_query_cancel(struct GSF_CadetRequest *sr)
Cancel an active request; must not be called after &#39;proc&#39; was calld.
Request must only be forwarded (no routing)
Performance data kept for a peer.
#define MAX_DATASTORE_QUEUE
Maximum size of the datastore queue for P2P operations.
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_min(struct GNUNET_TIME_Absolute t1, struct GNUNET_TIME_Absolute t2)
Return the minimum of two absolute time values.
Definition: time.c:302
struct GNUNET_DHT_GetHandle * GNUNET_DHT_get_start(struct GNUNET_DHT_Handle *handle, enum GNUNET_BLOCK_Type type, const struct GNUNET_HashCode *key, uint32_t desired_replication_level, enum GNUNET_DHT_RouteOption options, const void *xquery, size_t xquery_size, GNUNET_DHT_GetIterator iter, void *iter_cls)
Perform an asynchronous GET operation on the DHT identified.
Definition: dht_api.c:1072
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_duration(struct GNUNET_TIME_Absolute whence)
Get the duration of an operation as the difference of the current time and the given start time "henc...
Definition: time.c:373
struct GNUNET_BLOCK_Group * bg
Block group for filtering replies we&#39;ve already seen.
#define GNUNET_LOAD_value_free(lv)
Free a load value.
struct GNUNET_PeerIdentity origin
Request origin.
enum GNUNET_BLOCK_EvaluationResult eval
Evaluation result (returned).
Query format does not match block type (invalid query).
void GSF_dht_lookup_(struct GSF_PendingRequest *pr)
Consider looking up the data in the DHT (anonymity-level permitting).
void GNUNET_PEER_change_rc(GNUNET_PEER_Id id, int delta)
Change the reference counter of an interned PID.
Definition: peer.c:198
#define GNUNET_TIME_UNIT_YEARS
One year (365 days).
#define GNUNET_log(kind,...)
Entry in list of pending tasks.
Definition: scheduler.c:134
struct GNUNET_CONTAINER_HeapNode * hnode
Entry for this pending request in the expiration heap, or NULL.
void(* GSF_PendingRequestReplyHandler)(void *cls, enum GNUNET_BLOCK_EvaluationResult eval, struct GSF_PendingRequest *pr, uint32_t reply_anonymity_level, struct GNUNET_TIME_Absolute expiration, struct GNUNET_TIME_Absolute last_transmission, enum GNUNET_BLOCK_Type type, const void *data, size_t data_len)
Handle a reply to a pending request.
static void process_local_reply(void *cls, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid)
We&#39;re processing (local) results for a search request from another peer.
Context for put_migration_continuation().
A connected peer.
Data block (leaf) in the CHK tree.
Block group data.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
static void handle_dht_reply(void *cls, struct GNUNET_TIME_Absolute exp, const struct GNUNET_HashCode *key, const struct GNUNET_PeerIdentity *get_path, unsigned int get_path_length, const struct GNUNET_PeerIdentity *put_path, unsigned int put_path_length, enum GNUNET_BLOCK_Type type, size_t size, const void *data)
Iterator called on each result obtained for a DHT operation that expects a reply. ...
struct GNUNET_TIME_Relative migration_delay
If we get content we already have from this peer, for how long do we block it? Adjusted based on the ...
unsigned int GSF_cover_content_count
How many content messages have we received &#39;recently&#39; that have not yet been claimed as cover traffic...
struct GSF_PendingRequest * GSF_pending_request_create_(enum GSF_PendingRequestOptions options, enum GNUNET_BLOCK_Type type, const struct GNUNET_HashCode *query, const struct GNUNET_PeerIdentity *target, const char *bf_data, size_t bf_size, uint32_t mingle, uint32_t anonymity_level, uint32_t priority, int32_t ttl, GNUNET_PEER_Id sender_pid, GNUNET_PEER_Id origin_pid, const struct GNUNET_HashCode *replies_seen, unsigned int replies_seen_count, GSF_PendingRequestReplyHandler rh, void *rh_cls)
Create a new pending request.
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_remaining(struct GNUNET_TIME_Absolute future)
Given a timestamp in the future, how much time remains until then?
Definition: time.c:331
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
Time for absolute times used by GNUnet, in microseconds.
#define GNUNET_YES
Definition: gnunet_common.h:80
static unsigned int anonymity
uint32_t priority
Priority that this request (still) has for us.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_key(struct GNUNET_DATASTORE_Handle *h, uint64_t next_uid, bool random, const struct GNUNET_HashCode *key, enum GNUNET_BLOCK_Type type, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a result for a particular key from the datastore.
void GSF_block_peer_migration_(struct GSF_ConnectedPeer *cp, struct GNUNET_TIME_Absolute block_time)
Ask a peer to stop migrating data to us until the given point in time.
An active request.
int GNUNET_FS_handle_on_demand_block(const struct GNUNET_HashCode *key, uint32_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid, GNUNET_DATASTORE_DatumProcessor cont, void *cont_cls)
We&#39;ve received an on-demand encoded block from the datastore.
static struct GNUNET_PeerIdentity pid
Identity of the peer we transmit to / connect to.
struct GSF_CadetRequest * cadet_request
Cadet request handle for this request (or NULL for none).
void GSF_pending_request_cancel_(struct GSF_PendingRequest *pr, int full_cleanup)
Explicitly cancel a pending request.
int GNUNET_CONFIGURATION_get_value_yesno(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option)
Get a configuration value that should be in a set of "YES" or "NO".
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MulitHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
int request_found
Did we find a matching request?
uint32_t data
The data value.
void * GNUNET_CONTAINER_heap_remove_node(struct GNUNET_CONTAINER_HeapNode *node)
Removes a node from the heap.
GNUNET_BLOCK_EvaluationOptions
Flags that can be set to control the evaluation.
static size_t data_size
Number of bytes in data.
Query is valid, no reply given.
#define GNUNET_MESSAGE_TYPE_FS_GET
P2P request for content (one FS to another).
struct GNUNET_TIME_Relative GNUNET_TIME_relative_saturating_multiply(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Saturating multiply relative time by a given factor.
Definition: time.c:499
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
GSF_PendingRequestOptions
Options for pending requests (bits to be ORed).
GNUNET_PEER_Id sender_pid
Identity of the peer that we should use for the &#39;sender&#39; (recipient of the response) when forwarding ...
#define DATASTORE_LOAD_AUTODECLINE
At what frequency should our datastore load decrease automatically (since if we don&#39;t use it...
Each peer along the way should look at &#39;enc&#39; (otherwise only the k-peers closest to the key should lo...
No good quality of the operation is needed (i.e., random numbers can be pseudo-random).
#define GNUNET_malloc(size)
Wrapper around malloc.
struct GNUNET_HashCode * replies_seen
Array of hash codes of replies we&#39;ve already seen.
static int active_to_migration
Are we allowed to migrate content to this peer.
#define GNUNET_free(ptr)
Wrapper around free.
static void call_continuation(struct GSF_PendingRequest *pr)
Time for relative time used by GNUnet, in microseconds.
void GNUNET_BLOCK_group_destroy(struct GNUNET_BLOCK_Group *bg)
Destroy resources used by a block group.
Definition: block.c:206
int GNUNET_BLOCK_group_set_seen(struct GNUNET_BLOCK_Group *bg, const struct GNUNET_HashCode *seen_results, unsigned int seen_results_count)
Update block group to filter out the given results.
Definition: block.c:410
#define gettext_noop(String)
Definition: gettext.h:69
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:965