GNUnet  0.10.x
gnunet-service-fs_pr.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2009-2013 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_load_lib.h"
29 #include "gnunet-service-fs.h"
30 #include "gnunet-service-fs_cp.h"
32 #include "gnunet-service-fs_pe.h"
33 #include "gnunet-service-fs_pr.h"
35 
36 
40 #define DHT_GET_REPLICATION 5
41 
47 #define MAX_DATASTORE_QUEUE (16 * MAX_QUEUE_PER_PEER)
48 
55 #define CONTENT_BANDWIDTH_VALUE 800
56 
60 #define MAX_RESULTS (100 * 1024)
61 
65 #define INSANE_STATISTICS GNUNET_NO
66 
71 #define CADET_RETRY_MAX 3
72 
73 
82 
87 
91  void *rh_cls;
92 
97 
102 
107 
112 
117 
122 
128 
133 
138 
144 
150 
155 
160 
165 
169  bool seen_null;
170 
175  uint64_t first_uid;
176 
180  size_t result_count;
181 
186  unsigned int cadet_retry_count;
187 
191  unsigned int replies_seen_count;
192 
196  unsigned int replies_seen_size;
197 };
198 
199 
205 
206 
211 
212 
217 
218 
228 
229 
235 static unsigned long long max_pending_requests = (32 * 1024);
236 
237 
248 static void
250 {
251  if (NULL != pr->bg)
252  {
254  pr->bg = NULL;
255  }
256  if (GNUNET_BLOCK_TYPE_FS_UBLOCK != type)
257  return; /* no need */
258  pr->bg =
260  type,
262  UINT32_MAX),
263  NULL,
264  0,
265  "seen-set-size",
266  pr->replies_seen_count,
267  NULL);
268  if (NULL == pr->bg)
269  return;
272  pr->replies_seen,
273  pr->replies_seen_count));
274 }
275 
276 
298 struct GSF_PendingRequest *
300  enum GNUNET_BLOCK_Type type,
301  const struct GNUNET_HashCode *query,
302  const struct GNUNET_PeerIdentity *target,
303  const char *bf_data,
304  size_t bf_size,
305  uint32_t mingle,
306  uint32_t anonymity_level,
307  uint32_t priority,
308  int32_t ttl,
311  const struct GNUNET_HashCode *replies_seen,
312  unsigned int replies_seen_count,
314  void *rh_cls)
315 {
316  struct GSF_PendingRequest *pr;
317  struct GSF_PendingRequest *dpr;
318  size_t extra;
319  struct GNUNET_HashCode *eptr;
320 
322  "Creating request handle for `%s' of type %d\n",
323  GNUNET_h2s(query),
324  type);
325 #if INSANE_STATISTICS
327  gettext_noop("# Pending requests created"),
328  1,
329  GNUNET_NO);
330 #endif
331  extra = 0;
332  if (NULL != target)
333  extra += sizeof(struct GNUNET_PeerIdentity);
334  pr = GNUNET_malloc(sizeof(struct GSF_PendingRequest) + extra);
335  pr->public_data.query = *query;
336  eptr = (struct GNUNET_HashCode *)&pr[1];
337  if (NULL != target)
338  {
339  pr->public_data.target = (struct GNUNET_PeerIdentity *)eptr;
340  GNUNET_memcpy(eptr, target, sizeof(struct GNUNET_PeerIdentity));
341  }
343  pr->public_data.priority = priority;
344  pr->public_data.original_priority = priority;
346  pr->public_data.type = type;
348  pr->sender_pid = sender_pid;
349  pr->origin_pid = origin_pid;
350  pr->rh = rh;
351  pr->rh_cls = rh_cls;
352  GNUNET_assert((sender_pid != 0) || (0 == (options & GSF_PRO_FORWARD_ONLY)));
353  if (ttl >= 0)
356  else
360  (uint32_t)(-ttl)));
361  if (replies_seen_count > 0)
362  {
364  pr->replies_seen =
367  replies_seen,
368  replies_seen_count * sizeof(struct GNUNET_HashCode));
370  }
371  if ((NULL != bf_data) &&
373  {
375  pr->public_data.type,
376  mingle,
377  bf_data,
378  bf_size,
379  "seen-set-size",
380  0,
381  NULL);
382  }
383  else if ((replies_seen_count > 0) &&
384  (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)))
385  {
387  }
389  &pr->public_data.query,
390  pr,
392  if (0 == (options & GSF_PRO_REQUEST_NEVER_EXPIRES))
393  {
394  pr->hnode = GNUNET_CONTAINER_heap_insert(requests_by_expiration_heap,
395  pr,
397  /* make sure we don't track too many requests */
398  while (GNUNET_CONTAINER_heap_get_size(requests_by_expiration_heap) >
400  {
401  dpr = GNUNET_CONTAINER_heap_peek(requests_by_expiration_heap);
402  GNUNET_assert(NULL != dpr);
403  if (pr == dpr)
404  break; /* let the request live briefly... */
405  if (NULL != dpr->rh)
406  dpr->rh(dpr->rh_cls,
408  dpr,
409  UINT32_MAX,
413  NULL,
414  0);
416  }
417  }
419  gettext_noop("# Pending requests active"),
420  1,
421  GNUNET_NO);
422  return pr;
423 }
424 
431 struct GSF_PendingRequestData *
433 {
434  return &pr->public_data;
435 }
436 
437 
447 int
449  struct GSF_PendingRequest *prb)
450 {
451  if ((pra->public_data.type != prb->public_data.type) ||
452  (0 != memcmp(&pra->public_data.query,
453  &prb->public_data.query,
454  sizeof(struct GNUNET_HashCode))))
455  return GNUNET_NO;
456  return GNUNET_OK;
457 }
458 
459 
468 void
470  const struct GNUNET_HashCode *replies_seen,
471  unsigned int replies_seen_count)
472 {
473  if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
474  return; /* integer overflow */
476  {
477  /* we're responsible for the BF, full refresh */
478  if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
480  pr->replies_seen_size,
481  replies_seen_count + pr->replies_seen_count);
483  replies_seen,
484  sizeof(struct GNUNET_HashCode) * replies_seen_count);
487  }
488  else
489  {
490  if (NULL == pr->bg)
491  {
492  /* we're not the initiator, but the initiator did not give us
493  * any bloom-filter, so we need to create one on-the-fly */
495  }
496  else
497  {
500  replies_seen,
501  pr->replies_seen_count));
502  }
503  }
504  if (NULL != pr->gh)
506  replies_seen_count,
507  replies_seen);
508 }
509 
510 
518 struct GNUNET_MQ_Envelope *
520 {
521  struct GNUNET_MQ_Envelope *env;
522  struct GetMessage *gm;
523  struct GNUNET_PeerIdentity *ext;
524  unsigned int k;
525  uint32_t bm;
526  uint32_t prio;
527  size_t bf_size;
528  struct GNUNET_TIME_Absolute now;
529  int64_t ttl;
530  int do_route;
531  void *bf_data;
532  uint32_t bf_nonce;
533 
535  "Building request message for `%s' of type %d\n",
537  pr->public_data.type);
538  k = 0;
539  bm = 0;
540  do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
541  if ((!do_route) && (pr->sender_pid == 0))
542  {
543  GNUNET_break(0);
544  do_route = GNUNET_YES;
545  }
546  if (!do_route)
547  {
549  k++;
550  }
551  if (NULL != pr->public_data.target)
552  {
554  k++;
555  }
556  if (GNUNET_OK !=
557  GNUNET_BLOCK_group_serialize(pr->bg, &bf_nonce, &bf_data, &bf_size))
558  {
559  bf_size = 0;
560  bf_data = NULL;
561  }
562  env = GNUNET_MQ_msg_extra(gm,
563  bf_size + k * sizeof(struct GNUNET_PeerIdentity),
565  gm->type = htonl(pr->public_data.type);
566  if (do_route)
568  pr->public_data.priority + 1);
569  else
570  prio = 0;
571  pr->public_data.priority -= prio;
573  pr->public_data.respect_offered += prio;
574  gm->priority = htonl(prio);
575  now = GNUNET_TIME_absolute_get();
576  ttl = (int64_t)(pr->public_data.ttl.abs_value_us - now.abs_value_us);
577  gm->ttl = htonl(ttl / 1000LL / 1000LL);
578  gm->filter_mutator = htonl(bf_nonce);
579  gm->hash_bitmap = htonl(bm);
580  gm->query = pr->public_data.query;
581  ext = (struct GNUNET_PeerIdentity *)&gm[1];
582  k = 0;
583  if (!do_route)
584  GNUNET_PEER_resolve(pr->sender_pid, &ext[k++]);
585  if (NULL != pr->public_data.target)
586  ext[k++] = *pr->public_data.target;
587  GNUNET_memcpy(&ext[k], bf_data, bf_size);
588  GNUNET_free_non_null(bf_data);
589  return env;
590 }
591 
592 
601 static int
602 clean_request(void *cls, const struct GNUNET_HashCode *key, void *value)
603 {
604  struct GSF_PendingRequest *pr = value;
606 
608  "Cleaning up pending request for `%s'.\n",
609  GNUNET_h2s(key));
610  if (NULL != pr->cadet_request)
611  {
614  pr->cadet_request = NULL;
615  }
616  if (NULL != (cont = pr->llc_cont))
617  {
618  pr->llc_cont = NULL;
619  cont(pr->llc_cont_cls, pr, pr->local_result);
620  }
624  pr->bg = NULL;
626  pr->sender_pid = 0;
628  pr->origin_pid = 0;
629  if (NULL != pr->hnode)
630  {
632  pr->hnode = NULL;
633  }
634  if (NULL != pr->qe)
635  {
637  pr->qe = NULL;
638  }
639  if (NULL != pr->gh)
640  {
641  GNUNET_DHT_get_stop(pr->gh);
642  pr->gh = NULL;
643  }
644  if (NULL != pr->warn_task)
645  {
647  pr->warn_task = NULL;
648  }
650  GNUNET_OK ==
653  gettext_noop("# Pending requests active"),
654  -1,
655  GNUNET_NO);
656  GNUNET_free(pr);
657  return GNUNET_YES;
658 }
659 
660 
667 void
668 GSF_pending_request_cancel_(struct GSF_PendingRequest *pr, int full_cleanup)
669 {
671 
672  if (NULL == pr_map)
673  return; /* already cleaned up! */
674  if (GNUNET_NO == full_cleanup)
675  {
676  /* make request inactive (we're no longer interested in more results),
677  * but do NOT remove from our data-structures, we still need it there
678  * to prevent the request from looping */
679  pr->rh = NULL;
680  if (NULL != pr->cadet_request)
681  {
684  pr->cadet_request = NULL;
685  }
686  if (NULL != (cont = pr->llc_cont))
687  {
688  pr->llc_cont = NULL;
689  cont(pr->llc_cont_cls, pr, pr->local_result);
690  }
692  if (NULL != pr->qe)
693  {
695  pr->qe = NULL;
696  }
697  if (NULL != pr->gh)
698  {
699  GNUNET_DHT_get_stop(pr->gh);
700  pr->gh = NULL;
701  }
702  if (NULL != pr->warn_task)
703  {
705  pr->warn_task = NULL;
706  }
707  return;
708  }
710  clean_request(NULL, &pr->public_data.query, pr));
711 }
712 
713 
720 void
722 {
724  pr_map,
726  cls);
727 }
728 
729 
737  const void *data;
738 
743 
748 
752  size_t size;
753 
758 
763 
767  uint32_t priority;
768 
772  uint32_t anonymity_level;
773 
778 
783 };
784 
785 
793 static void
795  struct GSF_PendingRequest *pr)
796 {
797  if (prq->sender == NULL)
798  return;
801  prq->priority);
802 }
803 
804 
813 static int
814 process_reply(void *cls, const struct GNUNET_HashCode *key, void *value)
815 {
816  struct ProcessReplyClosure *prq = cls;
817  struct GSF_PendingRequest *pr = value;
818  struct GNUNET_HashCode chash;
819  struct GNUNET_TIME_Absolute last_transmission;
820 
821  if (NULL == pr->rh)
822  return GNUNET_YES;
824  "Matched result (type %u) for query `%s' with pending request\n",
825  (unsigned int)prq->type,
826  GNUNET_h2s(key));
828  gettext_noop("# replies received and matched"),
829  1,
830  GNUNET_NO);
832  prq->type,
833  pr->bg,
834  prq->eo,
835  key,
836  NULL,
837  0,
838  prq->data,
839  prq->size);
840  switch (prq->eval)
841  {
844  break;
845 
847  /* short cut: stop processing early, no BF-update, etc. */
852  .rel_value_us);
853  if (GNUNET_YES !=
855  .pr_head,
856  prq->sender,
857  &last_transmission))
858  last_transmission.abs_value_us =
859  GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us;
860  /* pass on to other peers / local clients */
861  pr->rh(pr->rh_cls,
862  prq->eval,
863  pr,
864  prq->anonymity_level,
865  prq->expiration,
866  last_transmission,
867  prq->type,
868  prq->data,
869  prq->size);
870  return GNUNET_YES;
871 
873 #if INSANE_STATISTICS
875  gettext_noop(
876  "# duplicate replies discarded (bloomfilter)"),
877  1,
878  GNUNET_NO);
879 #endif
880  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Duplicate response, discarding.\n");
881  return GNUNET_YES; /* duplicate */
882 
885  gettext_noop("# irrelevant replies discarded"),
886  1,
887  GNUNET_NO);
888  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Irrelevant response, ignoring.\n");
889  return GNUNET_YES;
890 
892  return GNUNET_YES; /* wrong namespace */
893 
895  GNUNET_break(0);
896  return GNUNET_YES;
897 
899  GNUNET_break(0);
900  return GNUNET_YES;
901 
904  _("Unsupported block type %u\n"),
905  prq->type);
906  return GNUNET_NO;
907  }
908  /* update bloomfilter */
909  GNUNET_CRYPTO_hash(prq->data, prq->size, &chash);
910  GSF_pending_request_update_(pr, &chash, 1);
911  if (NULL == prq->sender)
912  {
914  "Found result for query `%s' in local datastore\n",
915  GNUNET_h2s(key));
917  gettext_noop("# results found locally"),
918  1,
919  GNUNET_NO);
920  }
921  else
922  {
923  GSF_dht_lookup_(pr);
924  }
926  pr->public_data.priority = 0;
929  prq->request_found = GNUNET_YES;
930  /* finally, pass on to other peer / local client */
932  .pr_head,
933  prq->sender,
934  &last_transmission))
935  last_transmission.abs_value_us = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us;
936  pr->rh(pr->rh_cls,
937  prq->eval,
938  pr,
939  prq->anonymity_level,
940  prq->expiration,
941  last_transmission,
942  prq->type,
943  prq->data,
944  prq->size);
945  return GNUNET_YES;
946 }
947 
948 
957 
961  struct GNUNET_PeerIdentity origin;
962 
968 };
969 
970 
980 static void
982  int success,
984  const char *msg)
985 {
986  struct PutMigrationContext *pmc = cls;
987  struct GSF_ConnectedPeer *cp;
988  struct GNUNET_TIME_Relative mig_pause;
989  struct GSF_PeerPerformanceData *ppd;
990 
991  if (NULL != datastore_put_load)
992  {
993  if (GNUNET_SYSERR != success)
994  {
995  GNUNET_LOAD_update(datastore_put_load,
997  .rel_value_us);
998  }
999  else
1000  {
1001  /* on queue failure / timeout, increase the put load dramatically */
1002  GNUNET_LOAD_update(datastore_put_load,
1003  GNUNET_TIME_UNIT_MINUTES.rel_value_us);
1004  }
1005  }
1006  cp = GSF_peer_get_(&pmc->origin);
1007  if (GNUNET_OK == success)
1008  {
1009  if (NULL != cp)
1010  {
1012  ppd->migration_delay.rel_value_us /= 2;
1013  }
1014  GNUNET_free(pmc);
1015  return;
1016  }
1017  if ((GNUNET_NO == success) && (GNUNET_NO == pmc->requested) && (NULL != cp))
1018  {
1020  if (min_expiration.abs_value_us > 0)
1021  {
1023  "Asking to stop migration for %s because datastore is full\n",
1025  GNUNET_TIME_absolute_get_remaining(min_expiration),
1026  GNUNET_YES));
1027  GSF_block_peer_migration_(cp, min_expiration);
1028  }
1029  else
1030  {
1032  ppd->migration_delay);
1033  ppd->migration_delay =
1035  mig_pause.rel_value_us =
1038  ppd->migration_delay =
1040  GNUNET_log(
1042  "Replicated content already exists locally, asking to stop migration for %s\n",
1046  }
1047  }
1048  GNUNET_free(pmc);
1050  gettext_noop("# Datastore `PUT' failures"),
1051  1,
1052  GNUNET_NO);
1053 }
1054 
1055 
1065 static int
1066 test_put_load_too_high(uint32_t priority)
1067 {
1068  double ld;
1069 
1070  if (NULL == datastore_put_load)
1071  return GNUNET_NO;
1072  if (GNUNET_LOAD_get_average(datastore_put_load) < 50)
1073  return GNUNET_NO; /* very fast */
1074  ld = GNUNET_LOAD_get_load(datastore_put_load);
1075  if (ld < 2.0 * (1 + priority))
1076  return GNUNET_NO;
1078  gettext_noop(
1079  "# storage requests dropped due to high load"),
1080  1,
1081  GNUNET_NO);
1082  return GNUNET_YES;
1083 }
1084 
1085 
1101 static void
1103  struct GNUNET_TIME_Absolute exp,
1104  const struct GNUNET_HashCode *key,
1105  const struct GNUNET_PeerIdentity *get_path,
1106  unsigned int get_path_length,
1107  const struct GNUNET_PeerIdentity *put_path,
1108  unsigned int put_path_length,
1109  enum GNUNET_BLOCK_Type type,
1110  size_t size,
1111  const void *data)
1112 {
1113  struct GSF_PendingRequest *pr = cls;
1114  struct ProcessReplyClosure prq;
1115  struct PutMigrationContext *pmc;
1116 
1118  gettext_noop("# Replies received from DHT"),
1119  1,
1120  GNUNET_NO);
1121  memset(&prq, 0, sizeof(prq));
1122  prq.data = data;
1123  prq.expiration = exp;
1124  /* do not allow migrated content to live longer than 1 year */
1127  prq.expiration);
1128  prq.size = size;
1129  prq.type = type;
1130  prq.eo = GNUNET_BLOCK_EO_NONE;
1131  process_reply(&prq, key, pr);
1132  if ((GNUNET_YES == active_to_migration) &&
1134  {
1136  "Replicating result for query `%s' with priority %u\n",
1137  GNUNET_h2s(key),
1138  prq.priority);
1139  pmc = GNUNET_new(struct PutMigrationContext);
1141  pmc->requested = GNUNET_YES;
1142  if (NULL == GNUNET_DATASTORE_put(GSF_dsh,
1143  0,
1144  key,
1145  size,
1146  data,
1147  type,
1148  prq.priority,
1149  1 /* anonymity */,
1150  0 /* replication */,
1151  exp,
1152  1 + prq.priority,
1155  pmc))
1156  {
1158  GNUNET_SYSERR,
1160  NULL);
1161  }
1162  }
1163 }
1164 
1165 
1171 void
1173 {
1174  const void *xquery;
1175  size_t xquery_size;
1176  struct GNUNET_PeerIdentity pi;
1177  char buf[sizeof(struct GNUNET_HashCode) * 2] GNUNET_ALIGN;
1178 
1179  if (0 != pr->public_data.anonymity_level)
1180  return;
1181  if (NULL != pr->gh)
1182  {
1183  GNUNET_DHT_get_stop(pr->gh);
1184  pr->gh = NULL;
1185  }
1186  xquery = NULL;
1187  xquery_size = 0;
1188  if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
1189  {
1190  GNUNET_assert(0 != pr->sender_pid);
1191  GNUNET_PEER_resolve(pr->sender_pid, &pi);
1192  GNUNET_memcpy(&buf[xquery_size], &pi, sizeof(struct GNUNET_PeerIdentity));
1193  xquery_size += sizeof(struct GNUNET_PeerIdentity);
1194  }
1196  pr->public_data.type,
1197  &pr->public_data.query,
1200  xquery,
1201  xquery_size,
1203  pr);
1204  if ((NULL != pr->gh) && (0 != pr->replies_seen_count))
1206  pr->replies_seen_count,
1207  pr->replies_seen);
1208 }
1209 
1210 
1220 static void
1222  enum GNUNET_BLOCK_Type type,
1224  size_t data_size,
1225  const void *data)
1226 {
1227  struct GSF_PendingRequest *pr = cls;
1228  struct ProcessReplyClosure prq;
1229  struct GNUNET_HashCode query;
1230 
1231  pr->cadet_request = NULL;
1232  if (GNUNET_BLOCK_TYPE_ANY == type)
1233  {
1234  GNUNET_break(NULL == data);
1235  GNUNET_break(0 == data_size);
1236  pr->cadet_retry_count++;
1238  return; /* give up on cadet */
1239  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Error retrieiving block via cadet\n");
1240  /* retry -- without delay, as this is non-anonymous
1241  and cadet/cadet connect will take some time anyway */
1243  &pr->public_data.query,
1244  pr->public_data.type,
1246  pr);
1247  return;
1248  }
1249  if (GNUNET_YES !=
1250  GNUNET_BLOCK_get_key(GSF_block_ctx, type, data, data_size, &query))
1251  {
1253  "Failed to derive key for block of type %d\n",
1254  (int)type);
1255  GNUNET_break_op(0);
1256  return;
1257  }
1259  gettext_noop("# Replies received from CADET"),
1260  1,
1261  GNUNET_NO);
1262  memset(&prq, 0, sizeof(prq));
1263  prq.data = data;
1264  prq.expiration = expiration;
1265  /* do not allow migrated content to live longer than 1 year */
1268  prq.expiration);
1269  prq.size = data_size;
1270  prq.type = type;
1271  prq.eo = GNUNET_BLOCK_EO_NONE;
1272  process_reply(&prq, &query, pr);
1273 }
1274 
1275 
1281 void
1283 {
1284  if (0 != pr->public_data.anonymity_level)
1285  return;
1286  if (0 == pr->public_data.target)
1287  {
1289  "Cannot do cadet-based download, target peer not known\n");
1290  return;
1291  }
1292  if (NULL != pr->cadet_request)
1293  return;
1295  &pr->public_data.query,
1296  pr->public_data.type,
1298  pr);
1299 }
1300 
1301 
1307 static void
1309 {
1310  struct GSF_PendingRequest *pr = cls;
1311 
1313  _("Datastore lookup already took %s!\n"),
1316  GNUNET_YES));
1318  &warn_delay_task,
1319  pr);
1320 }
1321 
1322 
1328 static void
1330 {
1331  struct GSF_PendingRequest *pr = cls;
1332 
1334  _("On-demand lookup already took %s!\n"),
1337  GNUNET_YES));
1340  pr);
1341 }
1342 
1343 
1344 /* Call our continuation (if we have any) */
1345 static void
1347 {
1349 
1350  GNUNET_assert(NULL == pr->qe);
1351  if (NULL != pr->warn_task)
1352  {
1354  pr->warn_task = NULL;
1355  }
1356  if (NULL == cont)
1357  return; /* no continuation */
1358  pr->llc_cont = NULL;
1359  if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options))
1360  {
1362  {
1363  /* Signal that we are done and that there won't be any
1364  additional results to allow client to clean up state. */
1365  pr->rh(pr->rh_cls,
1367  pr,
1368  UINT32_MAX,
1372  NULL,
1373  0);
1374  }
1375  /* Finally, call our continuation to signal that we are
1376  done with local processing of this request; i.e. to
1377  start reading again from the client. */
1379  return;
1380  }
1381 
1382  cont(pr->llc_cont_cls, pr, pr->local_result);
1383 }
1384 
1385 
1386 /* Update stats and call continuation */
1387 static void
1389 {
1391  "No further local responses available.\n");
1392 #if INSANE_STATISTICS
1396  gettext_noop(
1397  "# requested DBLOCK or IBLOCK not found"),
1398  1,
1399  GNUNET_NO);
1400 #endif
1401  call_continuation(pr);
1402 }
1403 
1404 
1405 /* forward declaration */
1406 static void
1407 process_local_reply(void *cls,
1408  const struct GNUNET_HashCode *key,
1409  size_t size,
1410  const void *data,
1411  enum GNUNET_BLOCK_Type type,
1412  uint32_t priority,
1413  uint32_t anonymity,
1414  uint32_t replication,
1416  uint64_t uid);
1417 
1418 
1419 /* Start a local query */
1420 static void
1422  uint64_t next_uid,
1423  bool random)
1424 {
1427  &warn_delay_task,
1428  pr);
1430  next_uid,
1431  random,
1432  &pr->public_data.query,
1433  pr->public_data.type ==
1436  : pr->public_data.type,
1438  pr->public_data.options))
1439  ? UINT_MAX
1440  : 1
1441  /* queue priority */,
1443  pr->public_data.options))
1444  ? UINT_MAX
1446  /* max queue size */,
1448  pr);
1449  if (NULL != pr->qe)
1450  return;
1451  GNUNET_log(
1453  "ERROR Requesting `%s' of type %d with next_uid %llu from datastore.\n",
1455  pr->public_data.type,
1456  (unsigned long long)next_uid);
1458  gettext_noop(
1459  "# Datastore lookups concluded (error queueing)"),
1460  1,
1461  GNUNET_NO);
1462  call_continuation(pr);
1463 }
1464 
1465 
1484 static void
1486  const struct GNUNET_HashCode *key,
1487  size_t size,
1488  const void *data,
1489  enum GNUNET_BLOCK_Type type,
1490  uint32_t priority,
1491  uint32_t anonymity,
1492  uint32_t replication,
1493  struct GNUNET_TIME_Absolute expiration,
1494  uint64_t uid)
1495 {
1496  struct GSF_PendingRequest *pr = cls;
1497  struct ProcessReplyClosure prq;
1498  struct GNUNET_HashCode query;
1499  unsigned int old_rf;
1500 
1502  pr->warn_task = NULL;
1503  if (NULL == pr->qe)
1504  goto called_from_on_demand;
1505  pr->qe = NULL;
1506  if (
1507  (NULL == key) && pr->seen_null &&
1508  !pr->have_first_uid) /* We have hit the end for the 2nd time with no results */
1509  {
1510  /* No results */
1511 #if INSANE_STATISTICS
1513  gettext_noop(
1514  "# Datastore lookups concluded (no results)"),
1515  1,
1516  GNUNET_NO);
1517 #endif
1519  return;
1520  }
1521  if (((NULL == key) &&
1522  pr->seen_null) || /* We have hit the end for the 2nd time OR */
1523  (pr->seen_null && pr->have_first_uid &&
1524  (uid >= pr->first_uid))) /* We have hit the end and past first UID */
1525  {
1526  /* Seen all results */
1528  gettext_noop(
1529  "# Datastore lookups concluded (seen all)"),
1530  1,
1531  GNUNET_NO);
1533  return;
1534  }
1535  if (NULL == key)
1536  {
1537  GNUNET_assert(!pr->seen_null);
1538  pr->seen_null = true;
1539  start_local_query(pr, 0 /* next_uid */, false /* random */);
1540  return;
1541  }
1542  if (!pr->have_first_uid)
1543  {
1544  pr->first_uid = uid;
1545  pr->have_first_uid = true;
1546  }
1547  pr->result_count++;
1548  if (pr->result_count > MAX_RESULTS)
1549  {
1551  GSF_stats,
1552  gettext_noop("# Datastore lookups aborted (more than MAX_RESULTS)"),
1553  1,
1554  GNUNET_NO);
1556  return;
1557  }
1559  "Received reply for `%s' of type %d with UID %llu from datastore.\n",
1560  GNUNET_h2s(key),
1561  type,
1562  (unsigned long long)uid);
1563  if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1564  {
1566  "Found ONDEMAND block, performing on-demand encoding\n");
1568  gettext_noop(
1569  "# on-demand blocks matched requests"),
1570  1,
1571  GNUNET_NO);
1575  pr);
1577  size,
1578  data,
1579  type,
1580  priority,
1581  anonymity,
1582  replication,
1583  expiration,
1584  uid,
1586  pr))
1587  {
1589  gettext_noop(
1590  "# on-demand lookups performed successfully"),
1591  1,
1592  GNUNET_NO);
1593  return; /* we're done */
1594  }
1596  gettext_noop("# on-demand lookups failed"),
1597  1,
1598  GNUNET_NO);
1600  start_local_query(pr, uid + 1 /* next_uid */, false /* random */);
1601  return;
1602  }
1603 called_from_on_demand:
1604  old_rf = pr->public_data.results_found;
1605  memset(&prq, 0, sizeof(prq));
1606  prq.data = data;
1607  prq.expiration = expiration;
1608  prq.size = size;
1609  if (GNUNET_OK !=
1610  GNUNET_BLOCK_get_key(GSF_block_ctx, type, data, size, &query))
1611  {
1612  GNUNET_break(0);
1614  key,
1615  size,
1616  data,
1617  UINT_MAX,
1618  UINT_MAX,
1619  NULL,
1620  NULL);
1621  start_local_query(pr, uid + 1 /* next_uid */, false /* random */);
1622  return;
1623  }
1624  prq.type = type;
1625  prq.priority = priority;
1626  prq.request_found = GNUNET_NO;
1627  prq.anonymity_level = anonymity;
1628  if ((0 == old_rf) && (0 == pr->public_data.results_found))
1631  process_reply(&prq, key, pr);
1632  pr->local_result = prq.eval;
1634  {
1636  GSF_stats,
1637  gettext_noop("# Datastore lookups concluded (found last result)"),
1638  1,
1639  GNUNET_NO);
1640  call_continuation(pr);
1641  return;
1642  }
1643  if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
1645  (pr->public_data.results_found > 5 + 2 * pr->public_data.priority)))
1646  {
1647  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Load too high, done with request\n");
1649  gettext_noop(
1650  "# Datastore lookups concluded (load too high)"),
1651  1,
1652  GNUNET_NO);
1653  call_continuation(pr);
1654  return;
1655  }
1656  start_local_query(pr, uid + 1 /* next_uid */, false /* random */);
1657 }
1658 
1659 
1667 int
1669  const struct GNUNET_PeerIdentity *target)
1670 {
1671  struct GNUNET_PeerIdentity pi;
1672 
1673  if (0 == pr->origin_pid)
1674  return GNUNET_YES;
1675  GNUNET_PEER_resolve(pr->origin_pid, &pi);
1676  return (0 == memcmp(&pi, target, sizeof(struct GNUNET_PeerIdentity)))
1677  ? GNUNET_NO
1678  : GNUNET_YES;
1679 }
1680 
1681 
1689 void
1692  void *cont_cls)
1693 {
1694  GNUNET_assert(NULL == pr->gh);
1695  GNUNET_assert(NULL == pr->cadet_request);
1696  GNUNET_assert(NULL == pr->llc_cont);
1697  pr->llc_cont = cont;
1698  pr->llc_cont_cls = cont_cls;
1699 #if INSANE_STATISTICS
1701  gettext_noop("# Datastore lookups initiated"),
1702  1,
1703  GNUNET_NO);
1704 #endif
1705  start_local_query(pr, 0 /* next_uid */, true /* random */);
1706 }
1707 
1708 
1718 void
1719 handle_p2p_put(void *cls, const struct PutMessage *put)
1720 {
1721  struct GSF_ConnectedPeer *cp = cls;
1722  uint16_t msize;
1723  size_t dsize;
1724  enum GNUNET_BLOCK_Type type;
1725  struct GNUNET_TIME_Absolute expiration;
1726  struct GNUNET_HashCode query;
1727  struct ProcessReplyClosure prq;
1728  struct GNUNET_TIME_Relative block_time;
1729  double putl;
1730  struct PutMigrationContext *pmc;
1731 
1733  "Received P2P PUT from %s\n",
1736  msize = ntohs(put->header.size);
1737  dsize = msize - sizeof(struct PutMessage);
1738  type = ntohl(put->type);
1739  expiration = GNUNET_TIME_absolute_ntoh(put->expiration);
1740  /* do not allow migrated content to live longer than 1 year */
1743  expiration);
1744  if (GNUNET_OK !=
1745  GNUNET_BLOCK_get_key(GSF_block_ctx, type, &put[1], dsize, &query))
1746  {
1747  GNUNET_break_op(0);
1748  return;
1749  }
1751  gettext_noop("# GAP PUT messages received"),
1752  1,
1753  GNUNET_NO);
1754  /* now, lookup 'query' */
1755  prq.data = (const void *)&put[1];
1756  prq.sender = cp;
1757  prq.size = dsize;
1758  prq.type = type;
1759  prq.expiration = expiration;
1760  prq.priority = 0;
1761  prq.anonymity_level = UINT32_MAX;
1762  prq.request_found = GNUNET_NO;
1763  prq.eo = GNUNET_BLOCK_EO_NONE;
1765  &query,
1766  &process_reply,
1767  &prq);
1768  if (NULL != cp)
1769  {
1772  1000 * prq.priority);
1774  }
1775  if ((GNUNET_YES == active_to_migration) && (NULL != cp) &&
1777  {
1779  "Replicating result for query `%s' with priority %u\n",
1780  GNUNET_h2s(&query),
1781  prq.priority);
1782  pmc = GNUNET_new(struct PutMigrationContext);
1784  pmc->requested = prq.request_found;
1787  &pmc->origin);
1788  if (NULL == GNUNET_DATASTORE_put(GSF_dsh,
1789  0,
1790  &query,
1791  dsize,
1792  &put[1],
1793  type,
1794  prq.priority,
1795  1 /* anonymity */,
1796  0 /* replication */,
1797  expiration,
1798  1 + prq.priority,
1801  pmc))
1802  {
1804  GNUNET_SYSERR,
1806  NULL);
1807  }
1808  }
1809  else if (NULL != cp)
1810  {
1812  "Choosing not to keep content `%s' (%d/%d)\n",
1813  GNUNET_h2s(&query),
1816  }
1817  putl = GNUNET_LOAD_get_load(datastore_put_load);
1818  if ((NULL != cp) && (GNUNET_NO == prq.request_found) &&
1820  (putl > 2.5 * (1 + prq.priority))))
1821  {
1824  block_time = GNUNET_TIME_relative_multiply(
1827  (unsigned int)(60000 * putl * putl)));
1828  GNUNET_log(
1830  "Asking to stop migration for %s because of load %f and events %d/%d\n",
1832  putl,
1834  (GNUNET_NO == prq.request_found));
1836  GNUNET_TIME_relative_to_absolute(block_time));
1837  }
1838 }
1839 
1840 
1847 int
1849 {
1850  return (NULL != pr->rh) ? GNUNET_YES : GNUNET_NO;
1851 }
1852 
1853 
1857 void
1859 {
1860  if (GNUNET_OK !=
1862  "fs",
1863  "MAX_PENDING_REQUESTS",
1865  {
1867  "fs",
1868  "MAX_PENDING_REQUESTS");
1869  }
1871  GNUNET_CONFIGURATION_get_value_yesno(GSF_cfg, "FS", "CONTENT_CACHING");
1872  datastore_put_load = GNUNET_LOAD_value_init(DATASTORE_LOAD_AUTODECLINE);
1873  pr_map = GNUNET_CONTAINER_multihashmap_create(32 * 1024, GNUNET_YES);
1874  requests_by_expiration_heap =
1876 }
1877 
1878 
1882 void
1884 {
1887  pr_map = NULL;
1888  GNUNET_CONTAINER_heap_destroy(requests_by_expiration_heap);
1889  requests_by_expiration_heap = NULL;
1890  GNUNET_LOAD_value_free(datastore_put_load);
1891  datastore_put_load = NULL;
1892 }
1893 
1894 
1895 /* end of gnunet-service-fs_pr.c */
void GSF_connected_peer_change_preference_(struct GSF_ConnectedPeer *cp, uint64_t pref)
Notify core about a preference we have for the given peer (to allocate more resources towards it)...
Block does not match query (invalid result)
int(* GNUNET_CONTAINER_MulitHashMapIteratorCallback)(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator over hash map entries.
static struct GNUNET_TIME_Absolute min_expiration
Minimum time that content should have to not be discarded instantly (time stamp of any content that w...
API to handle &#39;connected peers&#39;.
struct GSF_ConnectedPeer * sender
Who gave us this reply? NULL for local host (or DHT)
const struct GNUNET_PeerIdentity * target
Identity of a peer hosting the content, otherwise NULl.
static void warn_delay_task(void *cls)
Task that issues a warning if the datastore lookup takes too long.
struct GSF_CadetRequest * GSF_cadet_query(const struct GNUNET_PeerIdentity *target, const struct GNUNET_HashCode *query, enum GNUNET_BLOCK_Type type, GSF_CadetReplyProcessor proc, void *proc_cls)
Look for a block by directly contacting a particular peer.
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_subtract(struct GNUNET_TIME_Absolute start, struct GNUNET_TIME_Relative duration)
Subtract a given relative duration from the given start time.
Definition: time.c:420
const void * data
The data for the reply.
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
static char * expiration
Credential TTL.
#define GNUNET_TIME_UNIT_HOURS
One hour.
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_ntoh(struct GNUNET_TIME_AbsoluteNBO a)
Convert absolute time from network byte order.
Definition: time.c:671
uint32_t priority
How much was this reply worth to us?
#define GNUNET_TIME_UNIT_ZERO_ABS
Absolute time zero.
int GNUNET_CONFIGURATION_get_value_number(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, unsigned long long *number)
Get a configuration value that should be a number.
uint64_t rel_value_us
The actual value.
struct GNUNET_LOAD_Value * GNUNET_LOAD_value_init(struct GNUNET_TIME_Relative autodecline)
Create a new load value.
Definition: load.c:122
Request is allowed to refresh bloomfilter and change mingle value.
int GSF_request_plan_reference_get_last_transmission_(struct GSF_PendingRequestPlanBijection *pr_head, struct GSF_ConnectedPeer *sender, struct GNUNET_TIME_Absolute *result)
Get the last transmission attempt time for the request plan list referenced by pr_head, that was sent to sender.
void GSF_update_datastore_delay_(struct GNUNET_TIME_Absolute start)
We&#39;ve just now completed a datastore request.
Any type of block, used as a wildcard when searching.
Response from FS service with a result for a previous FS search.
Definition: fs.h:321
struct GNUNET_CONTAINER_HeapNode * GNUNET_CONTAINER_heap_insert(struct GNUNET_CONTAINER_Heap *heap, void *element, GNUNET_CONTAINER_HeapCostType cost)
Inserts a new element into the heap.
struct GNUNET_GETOPT_CommandLineOption options[]
Definition: 002.c:5
struct GNUNET_TIME_AbsoluteNBO expiration
When does this result expire?
Definition: fs.h:335
uint32_t num_transmissions
Counter for how often this request has been transmitted (estimate, because we might have the same req...
#define GET_MESSAGE_BIT_TRANSMIT_TO
The peer identity of a peer that had claimed to have the content previously is included (can be used ...
int32_t ttl
Relative time to live in MILLISECONDS (network byte order)
GNUNET_BLOCK_Type
Blocks in the datastore and the datacache must have a unique type.
struct GNUNET_TIME_Absolute expiration
When the reply expires.
uint32_t priority
How important is this request (network byte order)
bool seen_null
Have we seen a NULL result yet?
struct GNUNET_TIME_Absolute ttl
Current TTL for the request.
uint64_t GNUNET_CRYPTO_random_u64(enum GNUNET_CRYPTO_Quality mode, uint64_t max)
Random on unsigned 64-bit values.
#define GNUNET_TIME_UNIT_MINUTES
One minute.
struct GNUNET_TIME_Relative GNUNET_TIME_relative_max(struct GNUNET_TIME_Relative t1, struct GNUNET_TIME_Relative t2)
Return the maximum of two relative time values.
Definition: time.c:287
static void no_more_local_results(struct GSF_PendingRequest *pr)
enum GNUNET_BLOCK_EvaluationOptions eo
Control flags for evaluation.
static int start
Set if we are to start default services (including ARM).
Definition: gnunet-arm.c:39
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_put(struct GNUNET_DATASTORE_Handle *h, uint32_t rid, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Store an item in the datastore.
Closure for process_reply() function.
struct GNUNET_TIME_Absolute GNUNET_TIME_relative_to_absolute(struct GNUNET_TIME_Relative rel)
Convert relative time to an absolute time in the future.
Definition: time.c:246
uint32_t GNUNET_CRYPTO_random_u32(enum GNUNET_CRYPTO_Quality mode, uint32_t i)
Produce a random value.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_TIME_UNIT_SECONDS
One second.
unsigned int results_found
Number of results we have found for this request so far.
static void put_migration_continuation(void *cls, int success, struct GNUNET_TIME_Absolute min_expiration, const char *msg)
Continuation called to notify client about result of the operation.
struct GNUNET_SCHEDULER_Task * warn_task
Task that warns us if the local datastore lookup takes too long.
uint32_t anonymity_level
Desired anonymity level.
double GNUNET_LOAD_get_load(struct GNUNET_LOAD_Value *load)
Get the current load.
Definition: load.c:199
static unsigned int replication
static struct GNUNET_CONTAINER_Heap * requests_by_expiration_heap
Heap with the request that will expire next at the top.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
static struct GNUNET_CONTAINER_MultiHashMap * pr_map
All pending requests, ordered by the query.
struct GSF_PendingRequestPlanBijection * pr_head
Fields for the plan module to track a DLL with the request.
struct GSF_PendingRequestData public_data
Public data for the request.
Block does not match xquery (valid result, not relevant for the request)
struct GNUNET_DHT_Handle * GSF_dht
Handle for DHT operations.
bool have_first_uid
Do we have a first UID yet?
Request priority is allowed to be exceeded.
struct GNUNET_STATISTICS_Handle * GSF_stats
Handle for reporting statistics.
#define GNUNET_NO
Definition: gnunet_common.h:78
double GNUNET_LOAD_get_average(struct GNUNET_LOAD_Value *load)
Get the average value given to update so far.
Definition: load.c:214
shared data structures of gnunet-service-fs.c
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:75
#define GNUNET_free_non_null(ptr)
Free the memory pointed to by ptr if ptr is not NULL.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
int GSF_pending_request_is_compatible_(struct GSF_PendingRequest *pra, struct GSF_PendingRequest *prb)
Test if two pending requests are compatible (would generate the same query modulo filters and should ...
Default behavior.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
struct GNUNET_TIME_Absolute start_time
When did we start with the request.
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
static void refresh_bloomfilter(enum GNUNET_BLOCK_Type type, struct GSF_PendingRequest *pr)
Recalculate our bloom filter for filtering replies.
static int test_put_load_too_high(uint32_t priority)
Test if the DATABASE (PUT) load on this peer is too high to even consider processing the query at all...
static int process_reply(void *cls, const struct GNUNET_HashCode *key, void *value)
We have received a reply; handle it!
static void update_request_performance_data(struct ProcessReplyClosure *prq, struct GSF_PendingRequest *pr)
Update the performance data for the sender (if any) since the sender successfully answered one of our...
void GNUNET_PEER_resolve(GNUNET_PEER_Id id, struct GNUNET_PeerIdentity *pid)
Convert an interned PID to a normal peer identity.
Definition: peer.c:225
uint64_t abs_value_us
The actual value.
const struct GNUNET_CONFIGURATION_Handle * GSF_cfg
Our configuration.
void GSF_pending_request_update_(struct GSF_PendingRequest *pr, const struct GNUNET_HashCode *replies_seen, unsigned int replies_seen_count)
Update a given pending request with additional replies that have been seen.
Internal representation of the hash map.
static unsigned int anonymity_level
Anonymity level option to use for publishing.
void GSF_pending_request_init_()
Setup the subsystem.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur...
GSF_LocalLookupContinuation llc_cont
Function to call upon completion of the local get request, or NULL for none.
size_t size
Size of data.
uint32_t filter_mutator
The content hash should be mutated using this value before checking against the bloomfilter (used to ...
Type of a block representing a block to be encoded on demand from disk.
void GSF_peer_update_performance_(struct GSF_ConnectedPeer *cp, struct GNUNET_TIME_Absolute request_time, uint32_t request_priority)
Report on receiving a reply; update the performance record of the given peer.
int GNUNET_BLOCK_get_key(struct GNUNET_BLOCK_Context *ctx, enum GNUNET_BLOCK_Type type, const void *block, size_t block_size, struct GNUNET_HashCode *key)
Function called to obtain the key for a block.
Definition: block.c:377
void GNUNET_LOAD_update(struct GNUNET_LOAD_Value *load, uint64_t data)
Update the current load.
Definition: load.c:235
size_t result_count
Result count.
#define GNUNET_TIME_UNIT_FOREVER_ABS
Constant used to specify "forever".
#define _(String)
GNU gettext support macro.
Definition: platform.h:181
struct GNUNET_HashCode query
Primary query hash for this request.
void(* GSF_LocalLookupContinuation)(void *cls, struct GSF_PendingRequest *pr, enum GNUNET_BLOCK_EvaluationResult result)
Function to be called after we&#39;re done processing replies from the local lookup.
struct GNUNET_TIME_Absolute start
Start time for the operation.
unsigned int replies_seen_count
Number of valid entries in the &#39;replies_seen&#39; array.
Request persists indefinitely (no expiration).
API to handle pending requests.
Public data (in the sense of not encapsulated within &#39;gnunet-service-fs_pr&#39;, not in the sense of netw...
GNUNET_PEER_Id origin_pid
Identity of the peer that we should never forward this query to since it originated this query (0 for...
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct...
Definition: gnunet_mq_lib.h:52
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1237
API to manage query plan.
Entry in our priority queue.
Definition: datastore_api.c:96
#define GNUNET_array_grow(arr, size, tsize)
Grow a well-typed (!) array.
unsigned int GNUNET_PEER_Id
A GNUNET_PEER_Id is simply a shorter version of a "struct GNUNET_PeerIdentifier" that can be used ins...
static unsigned long long max_pending_requests
Maximum number of requests (from other peers, overall) that we&#39;re willing to have pending at any give...
enum GNUNET_BLOCK_EvaluationResult local_result
Last result from the local datastore lookup evaluation.
void GNUNET_log_config_missing(enum GNUNET_ErrorType kind, const char *section, const char *option)
Log error message about missing configuration option.
static void start_local_query(struct GSF_PendingRequest *pr, uint64_t next_uid, bool random)
static char * value
Value of the record to add/remove.
Valid result, but suppressed because it is a duplicate.
int GSF_pending_request_test_active_(struct GSF_PendingRequest *pr)
Check if the given request is still active.
uint32_t respect
Respect rating for this peer.
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
Message to the datastore service asking about specific content.
Definition: datastore.h:136
void GNUNET_CRYPTO_hash(const void *block, size_t size, struct GNUNET_HashCode *ret)
Compute hash of a given block.
Definition: crypto_hash.c:44
uint32_t hash_bitmap
Which of the optional hash codes are present at the end of the message? See GET_MESSAGE_BIT_xx consta...
void * llc_cont_cls
Closure for llc_cont.
void GNUNET_CONTAINER_multihashmap_destroy(struct GNUNET_CONTAINER_MultiHashMap *map)
Destroy a hash map.
struct GSF_PeerPerformanceData * GSF_get_peer_performance_data_(struct GSF_ConnectedPeer *cp)
Return the performance data record for the given peer.
unsigned int GSF_datastore_queue_size
Size of the datastore queue we assume for common requests.
enum GNUNET_BLOCK_EvaluationResult GNUNET_BLOCK_evaluate(struct GNUNET_BLOCK_Context *ctx, enum GNUNET_BLOCK_Type type, struct GNUNET_BLOCK_Group *group, enum GNUNET_BLOCK_EvaluationOptions eo, const struct GNUNET_HashCode *query, const void *xquery, size_t xquery_size, const void *reply_block, size_t reply_block_size)
Function called to validate a reply or a request.
Definition: block.c:337
struct GNUNET_BLOCK_Context * GSF_block_ctx
Our block context.
struct GNUNET_HashCode query
Hashcodes of the file(s) we&#39;re looking for.
non-anonymous file-transfer
Inner block in the CHK tree.
void * GNUNET_CONTAINER_heap_peek(const struct GNUNET_CONTAINER_Heap *heap)
Get element stored at the root of heap.
struct GSF_ConnectedPeer * GSF_peer_get_(const struct GNUNET_PeerIdentity *peer)
Get a handle for a connected peer.
const char * GNUNET_STRINGS_relative_time_to_string(struct GNUNET_TIME_Relative delta, int do_round)
Give relative time in human-readable fancy format.
Definition: strings.c:686
#define GET_MESSAGE_BIT_RETURN_TO
The peer identity of a peer waiting for the reply is included (used if the response should be transmi...
Last possible valid result.
static char buf[2048]
#define GNUNET_new_array(n, type)
Allocate a size n array with structs or unions of the given type.
int GNUNET_CONTAINER_multihashmap_remove(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, const void *value)
Remove the given key-value pair from the map.
int(* GSF_PendingRequestIterator)(void *cls, const struct GNUNET_HashCode *key, struct GSF_PendingRequest *pr)
Signature of function called on each request.
void GSF_local_lookup_(struct GSF_PendingRequest *pr, GSF_LocalLookupContinuation cont, void *cont_cls)
Look up the request in the local datastore.
struct GNUNET_TIME_Absolute qe_start
Time we started the last datastore lookup.
void * rh_cls
Closure for rh.
struct GNUNET_TIME_Relative GNUNET_TIME_relative_multiply(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Multiply relative time by a given factor.
Definition: time.c:440
Handle to a node in a heap.
uint32_t respect_offered
How much respect did we (in total) offer for this request so far (estimate, because we might have the...
#define CONTENT_BANDWIDTH_VALUE
Bandwidth value of a 0-priority content (must be fairly high compared to query since content is typic...
enum GNUNET_BLOCK_Type type
Type of the requested block.
int GNUNET_BLOCK_group_serialize(struct GNUNET_BLOCK_Group *bg, uint32_t *nonce, void **raw_data, size_t *raw_data_size)
Serialize state of a block group.
Definition: block.c:179
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_remove(struct GNUNET_DATASTORE_Handle *h, const struct GNUNET_HashCode *key, size_t size, const void *data, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Explicitly remove some content from the database.
Request must only be processed locally.
Heap with the minimum cost at the root.
struct GNUNET_MessageHeader header
Message type will be GNUNET_MESSAGE_TYPE_FS_PUT.
Definition: fs.h:325
void GNUNET_CONTAINER_heap_destroy(struct GNUNET_CONTAINER_Heap *heap)
Destroys the heap.
Type of a block representing any type of search result (universal).
A 512-bit hashcode.
GSF_PendingRequestReplyHandler rh
Function to call if we encounter a reply.
#define GNUNET_TIME_UNIT_MILLISECONDS
One millisecond.
void GNUNET_DHT_get_stop(struct GNUNET_DHT_GetHandle *get_handle)
Stop async DHT-get.
Definition: dht_api.c:1150
uint32_t type
Desired content type.
Definition: datastore.h:145
static void odc_warn_delay_task(void *cls)
Task that issues a warning if the datastore lookup takes too long.
struct GNUNET_DATASTORE_Handle * GSF_dsh
Our connection to the datastore.
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_get(void)
Get the current time.
Definition: time.c:118
struct GNUNET_DHT_GetHandle * gh
DHT request handle for this request (or NULL for none).
The block is obtained from the local database, skip cryptographic checks.
void GSF_plan_notify_request_done_(struct GSF_PendingRequest *pr)
Notify the plan about a request being done; destroy all entries associated with this request...
Node in the heap.
unsigned int GNUNET_CONTAINER_heap_get_size(const struct GNUNET_CONTAINER_Heap *heap)
Get the current size of the heap.
Valid result, and there may be more.
#define CADET_RETRY_MAX
If obtaining a block via cadet fails, how often do we retry it before giving up for good (and stickin...
void GSF_cadet_lookup_(struct GSF_PendingRequest *pr)
Consider downloading via cadet (if possible)
int requested
GNUNET_YES if we had a matching request for this block, GNUNET_NO if not.
unsigned int cadet_retry_count
How often have we retried this request via &#39;cadet&#39;? (used to bound overall retries).
void GSF_pending_request_done_()
Shutdown the subsystem.
Handle for a request that is going out via cadet API.
struct GNUNET_TIME_Relative GNUNET_TIME_relative_min(struct GNUNET_TIME_Relative t1, struct GNUNET_TIME_Relative t2)
Return the minimum of two relative time values.
Definition: time.c:272
struct GNUNET_TESTBED_Peer * peer
The peer associated with this model.
struct GNUNET_HashCode key
The key used in the DHT.
#define GNUNET_SYSERR
Definition: gnunet_common.h:76
static unsigned int size
Size of the "table".
Definition: peer.c:66
#define MAX_RESULTS
Hard limit on the number of results we may get from the datastore per query.
void GNUNET_DHT_get_filter_known_results(struct GNUNET_DHT_GetHandle *get_handle, unsigned int num_results, const struct GNUNET_HashCode *results)
Tell the DHT not to return any of the following known results to this client.
Definition: dht_api.c:1121
int GSF_test_get_load_too_high_(uint32_t priority)
Test if the DATABASE (GET) load on this peer is too high to even consider processing the query at all...
uint32_t original_priority
Priority that this request (originally) had for us.
Values we track for load calculations.
Definition: load.c:35
#define DHT_GET_REPLICATION
Desired replication level for GETs.
GNUNET_BLOCK_EvaluationResult
Possible ways for how a block may relate to a query.
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
void GSF_iterate_pending_requests_(GSF_PendingRequestIterator it, void *cls)
Iterate over all pending requests.
indexing for the file-sharing service
int GNUNET_CONTAINER_multihashmap_put(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
struct GNUNET_BLOCK_Group * GNUNET_BLOCK_group_create(struct GNUNET_BLOCK_Context *ctx, enum GNUNET_BLOCK_Type type, uint32_t nonce, const void *raw_data, size_t raw_data_size,...)
Create a new block group.
Definition: block.c:288
uint32_t type
Type of the block (in big endian).
Definition: fs.h:330
Handle to a GET request.
Definition: dht_api.c:78
struct GNUNET_CONTAINER_Heap * GNUNET_CONTAINER_heap_create(enum GNUNET_CONTAINER_HeapOrder order)
Create a new heap.
struct GNUNET_MQ_Envelope * GSF_pending_request_get_message_(struct GSF_PendingRequest *pr)
Generate the message corresponding to the given pending request for transmission to other peers...
static int clean_request(void *cls, const struct GNUNET_HashCode *key, void *value)
Iterator to free pending requests.
Specified block type not supported by this plugin.
void handle_p2p_put(void *cls, const struct PutMessage *put)
Handle P2P "CONTENT" message.
Allow multiple values with the same key.
unsigned int replies_seen_size
Length of the &#39;replies_seen&#39; array.
int GSF_pending_request_test_target_(struct GSF_PendingRequest *pr, const struct GNUNET_PeerIdentity *target)
Is the given target a legitimate peer for forwarding the given request?
enum GNUNET_BLOCK_Type type
Type of the block.
void GNUNET_DATASTORE_cancel(struct GNUNET_DATASTORE_QueueEntry *qe)
Cancel a datastore operation.
static void cadet_reply_proc(void *cls, enum GNUNET_BLOCK_Type type, struct GNUNET_TIME_Absolute expiration, size_t data_size, const void *data)
Function called with a reply from the cadet.
uint32_t anonymity_level
Anonymity requirements for this reply.
struct GNUNET_DATASTORE_QueueEntry * qe
Datastore queue entry for this request (or NULL for none).
static struct GNUNET_LOAD_Value * datastore_put_load
Datastore &#39;PUT&#39; load tracking.
The identity of the host (wraps the signing key of the peer).
struct GNUNET_LOAD_Value * GSF_rt_entry_lifetime
How long do requests typically stay in the routing table?
struct GSF_PendingRequestData * GSF_pending_request_get_data_(struct GSF_PendingRequest *pr)
Obtain the public data associated with a pending request.
enum GSF_PendingRequestOptions options
Options for the request.
uint64_t first_uid
Unique ID of the first result from the local datastore; used to terminate the loop.
int GNUNET_CONTAINER_multihashmap_get_multiple(struct GNUNET_CONTAINER_MultiHashMap *map, const struct GNUNET_HashCode *key, GNUNET_CONTAINER_MulitHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map that match a particular key.
#define GNUNET_ALIGN
gcc-ism to force alignment; we use this to align char-arrays that may then be cast to &#39;struct&#39;s...
void GSF_cadet_query_cancel(struct GSF_CadetRequest *sr)
Cancel an active request; must not be called after &#39;proc&#39; was calld.
Request must only be forwarded (no routing)
Performance data kept for a peer.
#define MAX_DATASTORE_QUEUE
Maximum size of the datastore queue for P2P operations.
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_min(struct GNUNET_TIME_Absolute t1, struct GNUNET_TIME_Absolute t2)
Return the minimum of two absolute time values.
Definition: time.c:302
struct GNUNET_DHT_GetHandle * GNUNET_DHT_get_start(struct GNUNET_DHT_Handle *handle, enum GNUNET_BLOCK_Type type, const struct GNUNET_HashCode *key, uint32_t desired_replication_level, enum GNUNET_DHT_RouteOption options, const void *xquery, size_t xquery_size, GNUNET_DHT_GetIterator iter, void *iter_cls)
Perform an asynchronous GET operation on the DHT identified.
Definition: dht_api.c:1062
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_duration(struct GNUNET_TIME_Absolute whence)
Get the duration of an operation as the difference of the current time and the given start time "henc...
Definition: time.c:373
struct GNUNET_BLOCK_Group * bg
Block group for filtering replies we&#39;ve already seen.
#define GNUNET_LOAD_value_free(lv)
Free a load value.
struct GNUNET_PeerIdentity origin
Request origin.
enum GNUNET_BLOCK_EvaluationResult eval
Evaluation result (returned).
Query format does not match block type (invalid query).
void GSF_dht_lookup_(struct GSF_PendingRequest *pr)
Consider looking up the data in the DHT (anonymity-level permitting).
void GNUNET_PEER_change_rc(GNUNET_PEER_Id id, int delta)
Change the reference counter of an interned PID.
Definition: peer.c:197
#define GNUNET_TIME_UNIT_YEARS
One year (365 days).
#define GNUNET_log(kind,...)
Entry in list of pending tasks.
Definition: scheduler.c:131
struct GNUNET_CONTAINER_HeapNode * hnode
Entry for this pending request in the expiration heap, or NULL.
void(* GSF_PendingRequestReplyHandler)(void *cls, enum GNUNET_BLOCK_EvaluationResult eval, struct GSF_PendingRequest *pr, uint32_t reply_anonymity_level, struct GNUNET_TIME_Absolute expiration, struct GNUNET_TIME_Absolute last_transmission, enum GNUNET_BLOCK_Type type, const void *data, size_t data_len)
Handle a reply to a pending request.
static void process_local_reply(void *cls, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid)
We&#39;re processing (local) results for a search request from another peer.
Context for put_migration_continuation().
A connected peer.
Data block (leaf) in the CHK tree.
Block group data.
struct GNUNET_CONTAINER_MultiHashMap * GNUNET_CONTAINER_multihashmap_create(unsigned int len, int do_not_copy_keys)
Create a multi hash map.
static void handle_dht_reply(void *cls, struct GNUNET_TIME_Absolute exp, const struct GNUNET_HashCode *key, const struct GNUNET_PeerIdentity *get_path, unsigned int get_path_length, const struct GNUNET_PeerIdentity *put_path, unsigned int put_path_length, enum GNUNET_BLOCK_Type type, size_t size, const void *data)
Iterator called on each result obtained for a DHT operation that expects a reply. ...
struct GNUNET_TIME_Relative migration_delay
If we get content we already have from this peer, for how long do we block it? Adjusted based on the ...
unsigned int GSF_cover_content_count
How many content messages have we received &#39;recently&#39; that have not yet been claimed as cover traffic...
struct GSF_PendingRequest * GSF_pending_request_create_(enum GSF_PendingRequestOptions options, enum GNUNET_BLOCK_Type type, const struct GNUNET_HashCode *query, const struct GNUNET_PeerIdentity *target, const char *bf_data, size_t bf_size, uint32_t mingle, uint32_t anonymity_level, uint32_t priority, int32_t ttl, GNUNET_PEER_Id sender_pid, GNUNET_PEER_Id origin_pid, const struct GNUNET_HashCode *replies_seen, unsigned int replies_seen_count, GSF_PendingRequestReplyHandler rh, void *rh_cls)
Create a new pending request.
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_remaining(struct GNUNET_TIME_Absolute future)
Given a timestamp in the future, how much time remains until then?
Definition: time.c:331
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
Time for absolute times used by GNUnet, in microseconds.
#define GNUNET_YES
Definition: gnunet_common.h:77
static unsigned int anonymity
uint32_t priority
Priority that this request (still) has for us.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_key(struct GNUNET_DATASTORE_Handle *h, uint64_t next_uid, bool random, const struct GNUNET_HashCode *key, enum GNUNET_BLOCK_Type type, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a result for a particular key from the datastore.
void GSF_block_peer_migration_(struct GSF_ConnectedPeer *cp, struct GNUNET_TIME_Absolute block_time)
Ask a peer to stop migrating data to us until the given point in time.
An active request.
int GNUNET_FS_handle_on_demand_block(const struct GNUNET_HashCode *key, uint32_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid, GNUNET_DATASTORE_DatumProcessor cont, void *cont_cls)
We&#39;ve received an on-demand encoded block from the datastore.
static struct GNUNET_PeerIdentity pid
Identity of the peer we transmit to / connect to.
struct GSF_CadetRequest * cadet_request
Cadet request handle for this request (or NULL for none).
void GSF_pending_request_cancel_(struct GSF_PendingRequest *pr, int full_cleanup)
Explicitly cancel a pending request.
int GNUNET_CONFIGURATION_get_value_yesno(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option)
Get a configuration value that should be in a set of "YES" or "NO".
int GNUNET_CONTAINER_multihashmap_iterate(struct GNUNET_CONTAINER_MultiHashMap *map, GNUNET_CONTAINER_MulitHashMapIteratorCallback it, void *it_cls)
Iterate over all entries in the map.
int request_found
Did we find a matching request?
uint32_t data
The data value.
void * GNUNET_CONTAINER_heap_remove_node(struct GNUNET_CONTAINER_HeapNode *node)
Removes a node from the heap.
GNUNET_BLOCK_EvaluationOptions
Flags that can be set to control the evaluation.
static size_t data_size
Number of bytes in data.
Query is valid, no reply given.
#define GNUNET_MESSAGE_TYPE_FS_GET
P2P request for content (one FS to another).
struct GNUNET_TIME_Relative GNUNET_TIME_relative_saturating_multiply(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Saturating multiply relative time by a given factor.
Definition: time.c:499
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
GSF_PendingRequestOptions
Options for pending requests (bits to be ORed).
GNUNET_PEER_Id sender_pid
Identity of the peer that we should use for the &#39;sender&#39; (recipient of the response) when forwarding ...
#define DATASTORE_LOAD_AUTODECLINE
At what frequency should our datastore load decrease automatically (since if we don&#39;t use it...
Each peer along the way should look at &#39;enc&#39; (otherwise only the k-peers closest to the key should lo...
No good quality of the operation is needed (i.e., random numbers can be pseudo-random).
#define GNUNET_malloc(size)
Wrapper around malloc.
struct GNUNET_HashCode * replies_seen
Array of hash codes of replies we&#39;ve already seen.
static int active_to_migration
Are we allowed to migrate content to this peer.
#define GNUNET_free(ptr)
Wrapper around free.
static void call_continuation(struct GSF_PendingRequest *pr)
Time for relative time used by GNUnet, in microseconds.
void GNUNET_BLOCK_group_destroy(struct GNUNET_BLOCK_Group *bg)
Destroy resources used by a block group.
Definition: block.c:204
int GNUNET_BLOCK_group_set_seen(struct GNUNET_BLOCK_Group *bg, const struct GNUNET_HashCode *seen_results, unsigned int seen_results_count)
Update block group to filter out the given results.
Definition: block.c:408
#define gettext_noop(String)
Definition: gettext.h:69
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:956