GNUnet  0.10.x
gnunet-service-fs_push.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2011, 2016 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
27 #include "platform.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
31 #include "gnunet-service-fs_push.h"
32 
33 
37 #define MAX_MIGRATION_QUEUE 8
38 
43 #define MIGRATION_LIST_SIZE 2
44 
52 #define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 30)
53 
54 
63 
68 
73 
78 
84 
88  size_t size;
89 
93  unsigned int used_targets;
94 
99 };
100 
101 
110 
115 
120 
125 };
126 
127 
132 
137 
142 
147 
152 
157 
163 
167 static unsigned int mig_size;
168 
172 static int enabled;
173 
177 static int value_found;
178 
179 
185 static void
187 {
189  mig_tail,
190  mb);
193  mig_size--;
194  GNUNET_free(mb);
195 }
196 
197 
203 static void
204 find_content(void *cls);
205 
206 
214 static int
216  struct MigrationReadyBlock *block)
217 {
218  struct PutMessage *msg;
219  unsigned int i;
220  struct GSF_PeerPerformanceData *ppd;
221  int ret;
222 
224  GNUNET_assert(NULL == mrp->env);
225  mrp->env = GNUNET_MQ_msg_extra(msg,
226  block->size,
228  msg->type = htonl(block->type);
230  GNUNET_memcpy(&msg[1],
231  &block[1],
232  block->size);
233  for (i = 0; i < MIGRATION_LIST_SIZE; i++)
234  {
235  if (block->target_list[i] == 0)
236  {
237  block->target_list[i] = ppd->pid;
239  1);
240  break;
241  }
242  }
243  if (MIGRATION_LIST_SIZE == i)
244  {
245  delete_migration_block(block);
246  ret = GNUNET_YES;
247  }
248  else
249  {
250  ret = GNUNET_NO;
251  }
253  &find_content,
254  mrp);
256  GNUNET_NO,
257  0 /* priority */,
258  mrp->env);
259  return ret;
260 }
261 
262 
270 static unsigned int
272 {
273  unsigned int i;
274 
275  for (i = 0; i < MIGRATION_LIST_SIZE; i++)
276  if (block->target_list[i] == 0)
277  return i;
278  return i;
279 }
280 
281 
290 static long
292  struct MigrationReadyBlock *block)
293 {
294  unsigned int i;
295  struct GSF_PeerPerformanceData *ppd;
296  struct GNUNET_PeerIdentity id;
297  struct GNUNET_HashCode hc;
298  uint32_t dist;
299 
301  for (i = 0; i < MIGRATION_LIST_SIZE; i++)
302  if (block->target_list[i] == ppd->pid)
303  return -1;
304  GNUNET_assert(0 != ppd->pid);
306  &id);
307  GNUNET_CRYPTO_hash(&id,
308  sizeof(struct GNUNET_PeerIdentity),
309  &hc);
310  dist = GNUNET_CRYPTO_hash_distance_u32(&block->query,
311  &hc);
312  /* closer distance, higher score: */
313  return UINT32_MAX - dist;
314 }
315 
316 
321 static void
322 consider_gathering(void);
323 
324 
330 static void
331 find_content(void *cls)
332 {
333  struct MigrationReadyPeer *mrp = cls;
334  struct MigrationReadyBlock *pos;
335  long score;
336  long best_score;
337  struct MigrationReadyBlock *best;
338 
339  mrp->env = NULL;
340  best = NULL;
341  best_score = -1;
342  pos = mig_head;
343  while (NULL != pos)
344  {
345  score = score_content(mrp, pos);
346  if (score > best_score)
347  {
348  best_score = score;
349  best = pos;
350  }
351  pos = pos->next;
352  }
353  if (NULL == best)
354  {
356  {
358  "No content found for pushing, waiting for queue to fill\n");
359  return; /* will fill up eventually... */
360  }
362  "No suitable content found, purging content from full queue\n");
363  /* failed to find migration target AND
364  * queue is full, purge most-forwarded
365  * block from queue to make room for more */
366  pos = mig_head;
367  while (NULL != pos)
368  {
369  score = count_targets(pos);
370  if (score >= best_score)
371  {
372  best_score = score;
373  best = pos;
374  }
375  pos = pos->next;
376  }
377  GNUNET_assert(NULL != best);
380  return;
381  }
383  "Preparing to push best content to peer\n");
384  transmit_content(mrp,
385  best);
386 }
387 
388 
395 static void
396 gather_migration_blocks(void *cls);
397 
398 
403 static void
405 {
406  struct GNUNET_TIME_Relative delay;
407 
408  if (NULL == GSF_dsh)
409  return;
410  if (NULL != mig_qe)
411  return;
412  if (NULL != mig_task)
413  return;
415  return;
417  mig_size);
418  delay = GNUNET_TIME_relative_divide(delay,
420  delay = GNUNET_TIME_relative_max(delay,
422  if (GNUNET_NO == value_found)
423  {
424  /* wait at least 5s if the datastore is empty */
425  delay = GNUNET_TIME_relative_max(delay,
427  5));
428  }
430  "Scheduling gathering task (queue size: %u)\n",
431  mig_size);
432  mig_task = GNUNET_SCHEDULER_add_delayed(delay,
434  NULL);
435 }
436 
437 
453 static void
455  const struct GNUNET_HashCode *key,
456  size_t size,
457  const void *data,
458  enum GNUNET_BLOCK_Type type,
459  uint32_t priority,
460  uint32_t anonymity,
461  uint32_t replication,
463  uint64_t uid)
464 {
465  struct MigrationReadyBlock *mb;
466  struct MigrationReadyPeer *pos;
467 
468  mig_qe = NULL;
469  if (NULL == key)
470  {
472  "No content found for migration...\n");
474  return;
475  }
477  if (GNUNET_TIME_absolute_get_remaining(expiration).rel_value_us <
478  MIN_MIGRATION_CONTENT_LIFETIME.rel_value_us)
479  {
480  /* content will expire soon, don't bother */
482  return;
483  }
484  if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
485  {
486  if (GNUNET_OK !=
488  size,
489  data,
490  type,
491  priority,
492  anonymity,
493  replication,
494  expiration,
495  uid,
497  NULL))
499  return;
500  }
502  "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
503  GNUNET_h2s(key),
504  type, mig_size + 1,
506  mb = GNUNET_malloc(sizeof(struct MigrationReadyBlock) + size);
507  mb->query = *key;
508  mb->expiration = expiration;
509  mb->size = size;
510  mb->type = type;
511  GNUNET_memcpy(&mb[1], data, size);
513  mig_tail,
514  mig_tail,
515  mb);
516  mig_size++;
517  for (pos = peer_head; NULL != pos; pos = pos->next)
518  {
520  "Preparing to push best content to peer %s\n",
522  if ((NULL == pos->env) &&
524  mb)))
525  {
526  break; /* 'mb' was freed! */
527  }
528  }
530 }
531 
532 
539 static void
541 {
542  mig_task = NULL;
544  return;
545  if (NULL == GSF_dsh)
546  return;
548  "Asking datastore for content for replication (queue size: %u)\n",
549  mig_size);
552  0,
553  UINT_MAX,
555  NULL);
556  if (NULL == mig_qe)
558 }
559 
560 
567 void
569 {
570  struct MigrationReadyPeer *mrp;
571 
572  if (GNUNET_YES != enabled)
573  return;
574  for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
575  if (mrp->peer == peer)
576  break;
577  if (NULL != mrp)
578  {
579  /* same peer added twice, must not happen */
580  GNUNET_break(0);
581  return;
582  }
583 
585  "Adding peer %s to list for pushing\n",
587 
588  mrp = GNUNET_new(struct MigrationReadyPeer);
589  mrp->peer = peer;
590  find_content(mrp);
591  GNUNET_CONTAINER_DLL_insert(peer_head,
592  peer_tail,
593  mrp);
594 }
595 
596 
603 void
605 {
606  struct MigrationReadyPeer *pos;
607 
608  for (pos = peer_head; NULL != pos; pos = pos->next)
609  if (pos->peer == peer)
610  break;
611  if (NULL == pos)
612  return;
613  if (NULL != pos->env)
615  GNUNET_CONTAINER_DLL_remove(peer_head,
616  peer_tail,
617  pos);
618  GNUNET_free(pos);
619 }
620 
621 
625 void
627 {
628  enabled =
630  "FS",
631  "CONTENT_PUSHING");
632  if (GNUNET_YES != enabled)
633  return;
634 
635  if (GNUNET_OK !=
637  "fs",
638  "MIN_MIGRATION_DELAY",
640  {
642  "fs",
643  "MIN_MIGRATION_DELAY",
644  _("time required, content pushing disabled"));
645  return;
646  }
648 }
649 
650 
654 void
656 {
657  if (NULL != mig_task)
658  {
659  GNUNET_SCHEDULER_cancel(mig_task);
660  mig_task = NULL;
661  }
662  if (NULL != mig_qe)
663  {
664  GNUNET_DATASTORE_cancel(mig_qe);
665  mig_qe = NULL;
666  }
667  while (NULL != mig_head)
668  delete_migration_block(mig_head);
669  GNUNET_assert(0 == mig_size);
670 }
671 
672 /* end of gnunet-service-fs_push.c */
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
API to handle &#39;connected peers&#39;.
static long score_content(struct MigrationReadyPeer *mrp, struct MigrationReadyBlock *block)
Check if sending this block to this peer would be a good idea.
#define MIGRATION_LIST_SIZE
Blocks are at most migrated to this number of peers plus one, each time they are fetched from the dat...
struct GSF_ConnectedPeer * peer
Handle to peer.
#define MAX_MIGRATION_QUEUE
Maximum number of blocks we keep in memory for migration.
int GNUNET_CONFIGURATION_get_value_time(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, struct GNUNET_TIME_Relative *time)
Get a configuration value that should be a relative time.
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
struct GNUNET_MQ_Envelope * env
Envelope of the currently pushed message.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
void GSF_push_stop_(struct GSF_ConnectedPeer *peer)
A peer disconnected from us.
static struct GNUNET_DATASTORE_QueueEntry * mig_qe
Request to datastore for migration (or NULL).
Response from FS service with a result for a previous FS search.
Definition: fs.h:321
struct GNUNET_TIME_AbsoluteNBO expiration
When does this result expire?
Definition: fs.h:335
GNUNET_BLOCK_Type
Blocks in the datastore and the datacache must have a unique type.
struct GNUNET_TIME_Absolute expiration
When does this block expire?
struct GNUNET_TIME_Relative GNUNET_TIME_relative_max(struct GNUNET_TIME_Relative t1, struct GNUNET_TIME_Relative t2)
Return the maximum of two relative time values.
Definition: time.c:287
static struct MigrationReadyPeer * peer_tail
Tail of linked list of peers.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
void GSF_peer_transmit_(struct GSF_ConnectedPeer *cp, int is_query, uint32_t priority, struct GNUNET_MQ_Envelope *env)
Transmit a message to the given peer as soon as possible.
#define GNUNET_TIME_UNIT_SECONDS
One second.
static unsigned int replication
Information about a peer waiting for migratable data.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
static int transmit_content(struct MigrationReadyPeer *mrp, struct MigrationReadyBlock *block)
Send the given block to the given peer.
#define GNUNET_NO
Definition: gnunet_common.h:78
enum GNUNET_BLOCK_Type type
Type of the block.
shared data structures of gnunet-service-fs.c
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:75
#define GNUNET_new(type)
Allocate a struct or union of the given type.
static struct MigrationReadyBlock * mig_head
Head of linked list of blocks that can be migrated.
void GNUNET_log_config_invalid(enum GNUNET_ErrorType kind, const char *section, const char *option, const char *required)
Log error message about invalid configuration option value.
static struct GNUNET_SCHEDULER_Task * mig_task
ID of task that collects blocks for migration.
static int enabled
Is this module enabled?
uint32_t GNUNET_CRYPTO_hash_distance_u32(const struct GNUNET_HashCode *a, const struct GNUNET_HashCode *b)
Compute the distance between 2 hashcodes.
Definition: crypto_hash.c:121
static struct GNUNET_TIME_Relative min_migration_delay
What is the maximum frequency at which we are allowed to poll the datastore for migration content...
Block that is ready for migration to other peers.
static struct MigrationReadyBlock * mig_tail
Tail of linked list of blocks that can be migrated.
static int ret
Final status code.
Definition: gnunet-arm.c:89
void GNUNET_PEER_resolve(GNUNET_PEER_Id id, struct GNUNET_PeerIdentity *pid)
Convert an interned PID to a normal peer identity.
Definition: peer.c:225
const struct GNUNET_CONFIGURATION_Handle * GSF_cfg
Our configuration.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur...
Type of a block representing a block to be encoded on demand from disk.
static void delete_migration_block(struct MigrationReadyBlock *mb)
Delete the given migration block.
#define _(String)
GNU gettext support macro.
Definition: platform.h:181
static void consider_gathering(void)
If the migration task is not currently running, consider (re)scheduling it with the appropriate delay...
void GSF_push_init_()
Setup the module.
struct MigrationReadyPeer * prev
This is a doubly-linked list.
GNUNET_PEER_Id target_list[2]
Peers we already forwarded this block to.
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct...
Definition: gnunet_mq_lib.h:52
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1237
#define MIN_MIGRATION_CONTENT_LIFETIME
How long must content remain valid for us to consider it for migration? If content will expire too so...
Entry in our priority queue.
Definition: datastore_api.c:96
unsigned int GNUNET_PEER_Id
A GNUNET_PEER_Id is simply a shorter version of a "struct GNUNET_PeerIdentifier" that can be used ins...
unsigned int used_targets
Number of targets already used.
void GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Call a callback once the envelope has been sent, that is, sending it can not be canceled anymore...
Definition: mq.c:772
void GNUNET_CRYPTO_hash(const void *block, size_t size, struct GNUNET_HashCode *ret)
Compute hash of a given block.
Definition: crypto_hash.c:44
struct MigrationReadyBlock * next
This is a doubly-linked list.
struct GSF_PeerPerformanceData * GSF_get_peer_performance_data_(struct GSF_ConnectedPeer *cp)
Return the performance data record for the given peer.
struct GNUNET_TIME_Relative GNUNET_TIME_relative_multiply(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Multiply relative time by a given factor.
Definition: time.c:440
#define GNUNET_CONTAINER_DLL_insert_after(head, tail, other, element)
Insert an element into a DLL after the given other element.
static int value_found
Did we find anything in the datastore?
A 512-bit hashcode.
struct GNUNET_DATASTORE_Handle * GSF_dsh
Our connection to the datastore.
void GNUNET_PEER_decrement_rcs(const GNUNET_PEER_Id *ids, unsigned int count)
Decrement multiple RCs of peer identities by one.
Definition: peer.c:162
struct MigrationReadyPeer * next
This is a doubly-linked list.
struct GNUNET_TESTBED_Peer * peer
The peer associated with this model.
struct GNUNET_HashCode key
The key used in the DHT.
static void process_migration_content(void *cls, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid)
Process content offered for migration.
indexing for the file-sharing service
uint32_t type
Type of the block (in big endian).
Definition: fs.h:330
GNUNET_PEER_Id pid
The peer&#39;s identity (interned version).
void GSF_push_start_(struct GSF_ConnectedPeer *peer)
A peer connected to us.
void GNUNET_DATASTORE_cancel(struct GNUNET_DATASTORE_QueueEntry *qe)
Cancel a datastore operation.
The identity of the host (wraps the signing key of the peer).
static unsigned int mig_size
Size of the doubly-linked list of migration blocks.
Performance data kept for a peer.
const struct GNUNET_PeerIdentity * GSF_connected_peer_get_identity2_(const struct GSF_ConnectedPeer *cp)
Obtain the identity of a connected peer.
void GNUNET_PEER_change_rc(GNUNET_PEER_Id id, int delta)
Change the reference counter of an interned PID.
Definition: peer.c:197
#define GNUNET_log(kind,...)
Entry in list of pending tasks.
Definition: scheduler.c:131
A connected peer.
void GSF_push_done_()
Shutdown the module.
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_remaining(struct GNUNET_TIME_Absolute future)
Given a timestamp in the future, how much time remains until then?
Definition: time.c:331
struct GNUNET_TIME_Relative GNUNET_TIME_relative_divide(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Divide relative time by a given factor.
Definition: time.c:525
static unsigned int count_targets(struct MigrationReadyBlock *block)
Count the number of peers this block has already been forwarded to.
Time for absolute times used by GNUnet, in microseconds.
#define GNUNET_YES
Definition: gnunet_common.h:77
static unsigned int anonymity
struct GNUNET_HashCode query
Query for the block.
static void find_content(void *cls)
Find content for migration to this peer.
int GNUNET_FS_handle_on_demand_block(const struct GNUNET_HashCode *key, uint32_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid, GNUNET_DATASTORE_DatumProcessor cont, void *cont_cls)
We&#39;ve received an on-demand encoded block from the datastore.
support for pushing out content
int GNUNET_CONFIGURATION_get_value_yesno(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option)
Get a configuration value that should be in a set of "YES" or "NO".
uint32_t data
The data value.
struct GNUNET_TIME_AbsoluteNBO GNUNET_TIME_absolute_hton(struct GNUNET_TIME_Absolute a)
Convert absolute time to network byte order.
Definition: time.c:655
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_for_replication(struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a random value from the datastore for content replication.
static void gather_migration_blocks(void *cls)
Task that is run periodically to obtain blocks for content migration.
void GNUNET_MQ_send_cancel(struct GNUNET_MQ_Envelope *ev)
Cancel sending the message.
Definition: mq.c:913
#define GNUNET_MESSAGE_TYPE_FS_PUT
P2P response with content or active migration of content.
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
static struct MigrationReadyPeer * peer_head
Head of linked list of peers.
size_t size
Size of the block.
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_free(ptr)
Wrapper around free.
struct MigrationReadyBlock * prev
This is a doubly-linked list.
Time for relative time used by GNUnet, in microseconds.
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:956