GNUnet debian-0.24.3-29-g453fda2cf
 
Loading...
Searching...
No Matches
gnunet-service-fs_push.c File Reference

API to push content from our datastore to other peers ('anonymous'-content P2P migration) More...

Include dependency graph for gnunet-service-fs_push.c:

Go to the source code of this file.

Data Structures

struct  MigrationReadyBlock
 Block that is ready for migration to other peers. More...
 
struct  MigrationReadyPeer
 Information about a peer waiting for migratable data. More...
 

Macros

#define MAX_MIGRATION_QUEUE   8
 Maximum number of blocks we keep in memory for migration.
 
#define MIGRATION_LIST_SIZE   2
 Blocks are at most migrated to this number of peers plus one, each time they are fetched from the database.
 
#define MIN_MIGRATION_CONTENT_LIFETIME
 How long must content remain valid for us to consider it for migration? If content will expire too soon, there is clearly no point in pushing it to other peers.
 

Functions

static void delete_migration_block (struct MigrationReadyBlock *mb)
 Delete the given migration block.
 
static void find_content (void *cls)
 Find content for migration to this peer.
 
static int transmit_content (struct MigrationReadyPeer *mrp, struct MigrationReadyBlock *block)
 Send the given block to the given peer.
 
static unsigned int count_targets (struct MigrationReadyBlock *block)
 Count the number of peers this block has already been forwarded to.
 
static long score_content (struct MigrationReadyPeer *mrp, struct MigrationReadyBlock *block)
 Check if sending this block to this peer would be a good idea.
 
static void consider_gathering (void)
 If the migration task is not currently running, consider (re)scheduling it with the appropriate delay.
 
static void gather_migration_blocks (void *cls)
 Task that is run periodically to obtain blocks for content migration.
 
static void process_migration_content (void *cls, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid)
 Process content offered for migration.
 
void GSF_push_start_ (struct GSF_ConnectedPeer *peer)
 A peer connected to us.
 
void GSF_push_stop_ (struct GSF_ConnectedPeer *peer)
 A peer disconnected from us.
 
void GSF_push_init_ ()
 Setup the module.
 
void GSF_push_done_ ()
 Shutdown the module.
 

Variables

static struct MigrationReadyBlockmig_head
 Head of linked list of blocks that can be migrated.
 
static struct MigrationReadyBlockmig_tail
 Tail of linked list of blocks that can be migrated.
 
static struct MigrationReadyPeerpeer_head
 Head of linked list of peers.
 
static struct MigrationReadyPeerpeer_tail
 Tail of linked list of peers.
 
static struct GNUNET_DATASTORE_QueueEntrymig_qe
 Request to datastore for migration (or NULL).
 
static struct GNUNET_SCHEDULER_Taskmig_task
 ID of task that collects blocks for migration.
 
static struct GNUNET_TIME_Relative min_migration_delay
 What is the maximum frequency at which we are allowed to poll the datastore for migration content?
 
static unsigned int mig_size
 Size of the doubly-linked list of migration blocks.
 
static int enabled
 Is this module enabled?
 
static int value_found
 Did we find anything in the datastore?
 

Detailed Description

API to push content from our datastore to other peers ('anonymous'-content P2P migration)

Author
Christian Grothoff

Definition in file gnunet-service-fs_push.c.

Macro Definition Documentation

◆ MAX_MIGRATION_QUEUE

#define MAX_MIGRATION_QUEUE   8

Maximum number of blocks we keep in memory for migration.

Definition at line 37 of file gnunet-service-fs_push.c.

◆ MIGRATION_LIST_SIZE

#define MIGRATION_LIST_SIZE   2

Blocks are at most migrated to this number of peers plus one, each time they are fetched from the database.

Definition at line 43 of file gnunet-service-fs_push.c.

◆ MIN_MIGRATION_CONTENT_LIFETIME

#define MIN_MIGRATION_CONTENT_LIFETIME
Value:
#define GNUNET_TIME_UNIT_MINUTES
One minute.
struct GNUNET_TIME_Relative GNUNET_TIME_relative_multiply(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Multiply relative time by a given factor.
Definition time.c:486

How long must content remain valid for us to consider it for migration? If content will expire too soon, there is clearly no point in pushing it to other peers.

This value gives the threshold for migration. Note that if this value is increased, the migration testcase may need to be adjusted as well (especially the CONTENT_LIFETIME in fs_test_lib.c).

Definition at line 52 of file gnunet-service-fs_push.c.

59{
64
69
73 struct GNUNET_HashCode query;
74
79
85
89 size_t size;
90
94 unsigned int used_targets;
95
100};
101
102
107{
111 struct MigrationReadyPeer *next;
112
116 struct MigrationReadyPeer *prev;
117
121 struct GSF_ConnectedPeer *peer;
122
126 struct GNUNET_MQ_Envelope *env;
127};
128
129
133static struct MigrationReadyBlock *mig_head;
134
138static struct MigrationReadyBlock *mig_tail;
139
143static struct MigrationReadyPeer *peer_head;
144
148static struct MigrationReadyPeer *peer_tail;
149
154
158static struct GNUNET_SCHEDULER_Task *mig_task;
159
165
169static unsigned int mig_size;
170
174static int enabled;
175
179static int value_found;
180
181
187static void
189{
191 mig_tail,
192 mb);
195 mig_size--;
196 GNUNET_free (mb);
197}
198
199
205static void
206find_content (void *cls);
207
208
216static int
218 struct MigrationReadyBlock *block)
219{
220 struct PutMessage *msg;
221 unsigned int i;
222 struct GSF_PeerPerformanceData *ppd;
223 int ret;
224
226 GNUNET_assert (NULL == mrp->env);
228 block->size,
230 msg->type = htonl (block->type);
231 msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
232 GNUNET_memcpy (&msg[1],
233 &block[1],
234 block->size);
235 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
236 {
237 if (block->target_list[i] == 0)
238 {
239 block->target_list[i] = ppd->pid;
241 1);
242 break;
243 }
244 }
245 if (MIGRATION_LIST_SIZE == i)
246 {
248 ret = GNUNET_YES;
249 }
250 else
251 {
252 ret = GNUNET_NO;
253 }
256 mrp);
258 GNUNET_NO,
259 0 /* priority */,
260 mrp->env);
261 return ret;
262}
263
264
272static unsigned int
274{
275 unsigned int i;
276
277 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
278 if (block->target_list[i] == 0)
279 return i;
280 return i;
281}
282
283
292static long
294 struct MigrationReadyBlock *block)
295{
296 unsigned int i;
297 struct GSF_PeerPerformanceData *ppd;
298 struct GNUNET_PeerIdentity id;
299 struct GNUNET_HashCode hc;
300 uint32_t dist;
301
303 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
304 if (block->target_list[i] == ppd->pid)
305 return -1;
306 GNUNET_assert (0 != ppd->pid);
308 &id);
310 sizeof(struct GNUNET_PeerIdentity),
311 &hc);
313 &hc);
314 /* closer distance, higher score: */
315 return UINT32_MAX - dist;
316}
317
318
323static void
324consider_gathering (void);
325
326
327static void
328find_content (void *cls)
329{
330 struct MigrationReadyPeer *mrp = cls;
331 struct MigrationReadyBlock *pos;
332 long score;
333 long best_score;
334 struct MigrationReadyBlock *best;
335
336 mrp->env = NULL;
337 best = NULL;
338 best_score = -1;
339 pos = mig_head;
340 while (NULL != pos)
341 {
342 score = score_content (mrp, pos);
343 if (score > best_score)
344 {
345 best_score = score;
346 best = pos;
347 }
348 pos = pos->next;
349 }
350 if (NULL == best)
351 {
353 {
355 "No content found for pushing, waiting for queue to fill\n");
356 return; /* will fill up eventually... */
357 }
359 "No suitable content found, purging content from full queue\n");
360 /* failed to find migration target AND
361 * queue is full, purge most-forwarded
362 * block from queue to make room for more */
363 pos = mig_head;
364 while (NULL != pos)
365 {
366 score = count_targets (pos);
367 if (score >= best_score)
368 {
369 best_score = score;
370 best = pos;
371 }
372 pos = pos->next;
373 }
374 GNUNET_assert (NULL != best);
377 return;
378 }
380 "Preparing to push best content to peer\n");
381 transmit_content (mrp,
382 best);
383}
384
385
392static void
393gather_migration_blocks (void *cls);
394
395
400static void
402{
403 struct GNUNET_TIME_Relative delay;
404
405 if (NULL == GSF_dsh)
406 return;
407 if (NULL != mig_qe)
408 return;
409 if (NULL != mig_task)
410 return;
412 return;
414 mig_size);
415 delay = GNUNET_TIME_relative_divide (delay,
417 delay = GNUNET_TIME_relative_max (delay,
419 if (GNUNET_NO == value_found)
420 {
421 /* wait at least 5s if the datastore is empty */
422 delay = GNUNET_TIME_relative_max (delay,
425 5));
426 }
428 "Scheduling gathering task (queue size: %u)\n",
429 mig_size);
432 NULL);
433}
434
435
451static void
453 const struct GNUNET_HashCode *key,
454 size_t size,
455 const void *data,
457 uint32_t priority,
458 uint32_t anonymity,
459 uint32_t replication,
461 uint64_t uid)
462{
463 struct MigrationReadyBlock *mb;
464 struct MigrationReadyPeer *pos;
465
466 mig_qe = NULL;
467 if (NULL == key)
468 {
470 "No content found for migration...\n");
472 return;
473 }
477 {
478 /* content will expire soon, don't bother */
480 return;
481 }
483 {
484 if (GNUNET_OK !=
486 size,
487 data,
488 type,
489 priority,
490 anonymity,
493 uid,
495 NULL))
497 return;
498 }
500 "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
501 GNUNET_h2s (key),
502 type, mig_size + 1,
504 mb = GNUNET_malloc (sizeof(struct MigrationReadyBlock) + size);
505 mb->query = *key;
507 mb->size = size;
508 mb->type = type;
509 GNUNET_memcpy (&mb[1], data, size);
511 mig_tail,
512 mig_tail,
513 mb);
514 mig_size++;
515 for (pos = peer_head; NULL != pos; pos = pos->next)
516 {
518 "Preparing to push best content to peer %s\n",
520 if ((NULL == pos->env) &&
522 mb)))
523 {
524 break; /* 'mb' was freed! */
525 }
526 }
528}
529
530
537static void
538gather_migration_blocks (void *cls)
539{
540 mig_task = NULL;
542 return;
543 if (NULL == GSF_dsh)
544 return;
546 "Asking datastore for content for replication (queue size: %u)\n",
547 mig_size);
550 0,
551 UINT_MAX,
553 NULL);
554 if (NULL == mig_qe)
556}
557
558
565void
567{
568 struct MigrationReadyPeer *mrp;
569
570 if (GNUNET_YES != enabled)
571 return;
572 for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
573 if (mrp->peer == peer)
574 break;
575 if (NULL != mrp)
576 {
577 /* same peer added twice, must not happen */
578 GNUNET_break (0);
579 return;
580 }
581
583 "Adding peer %s to list for pushing\n",
585
586 mrp = GNUNET_new (struct MigrationReadyPeer);
587 mrp->peer = peer;
588 find_content (mrp);
590 peer_tail,
591 mrp);
592}
593
594
601void
603{
604 struct MigrationReadyPeer *pos;
605
606 for (pos = peer_head; NULL != pos; pos = pos->next)
607 if (pos->peer == peer)
608 break;
609 if (NULL == pos)
610 return;
611 if (NULL != pos->env)
614 peer_tail,
615 pos);
616 GNUNET_free (pos);
617}
618
619
623void
625{
626 enabled =
628 "FS",
629 "CONTENT_PUSHING");
630 if (GNUNET_YES != enabled)
631 return;
632
633 if (GNUNET_OK !=
635 "fs",
636 "MIN_MIGRATION_DELAY",
638 {
640 "fs",
641 "MIN_MIGRATION_DELAY",
642 _ ("time required, content pushing disabled"));
643 return;
644 }
646}
647
648
652void
654{
655 if (NULL != mig_task)
656 {
658 mig_task = NULL;
659 }
660 if (NULL != mig_qe)
661 {
663 mig_qe = NULL;
664 }
665 while (NULL != mig_head)
667 GNUNET_assert (0 == mig_size);
668}
669
670
671/* end of gnunet-service-fs_push.c */
struct GNUNET_MessageHeader * msg
Definition 005.c:2
static int ret
Final status code.
Definition gnunet-arm.c:93
static unsigned int replication
Desired replication level.
static char * data
The data to insert into the dht.
struct GNUNET_HashCode key
The key used in the DHT.
static struct GNUNET_TIME_Relative expiration
User supplied expiration value.
static unsigned int anonymity
static struct GNUNET_IDENTITY_Handle * id
Handle to IDENTITY.
static uint32_t type
Type string converted to DNS type value.
const struct GNUNET_CONFIGURATION_Handle * GSF_cfg
Our configuration.
struct GNUNET_DATASTORE_Handle * GSF_dsh
Our connection to the datastore.
struct GSF_PeerPerformanceData * GSF_get_peer_performance_data_(struct GSF_ConnectedPeer *cp)
Return the performance data record for the given peer.
const struct GNUNET_PeerIdentity * GSF_connected_peer_get_identity2_(const struct GSF_ConnectedPeer *cp)
Obtain the identity of a connected peer.
void GSF_peer_transmit_(struct GSF_ConnectedPeer *cp, int is_query, uint32_t priority, struct GNUNET_MQ_Envelope *env)
Transmit a message to the given peer as soon as possible.
int GNUNET_FS_handle_on_demand_block(const struct GNUNET_HashCode *key, uint32_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid, GNUNET_DATASTORE_DatumProcessor cont, void *cont_cls)
We've received an on-demand encoded block from the datastore.
static int enabled
Is this module enabled?
static unsigned int mig_size
Size of the doubly-linked list of migration blocks.
static struct GNUNET_SCHEDULER_Task * mig_task
ID of task that collects blocks for migration.
static unsigned int count_targets(struct MigrationReadyBlock *block)
Count the number of peers this block has already been forwarded to.
static struct MigrationReadyPeer * peer_head
Head of linked list of peers.
static void find_content(void *cls)
Find content for migration to this peer.
static struct MigrationReadyBlock * mig_tail
Tail of linked list of blocks that can be migrated.
static struct MigrationReadyBlock * mig_head
Head of linked list of blocks that can be migrated.
static long score_content(struct MigrationReadyPeer *mrp, struct MigrationReadyBlock *block)
Check if sending this block to this peer would be a good idea.
static struct GNUNET_TIME_Relative min_migration_delay
What is the maximum frequency at which we are allowed to poll the datastore for migration content?
void GSF_push_init_()
Setup the module.
static int value_found
Did we find anything in the datastore?
void GSF_push_done_()
Shutdown the module.
static void consider_gathering(void)
If the migration task is not currently running, consider (re)scheduling it with the appropriate delay...
void GSF_push_stop_(struct GSF_ConnectedPeer *peer)
A peer disconnected from us.
#define MAX_MIGRATION_QUEUE
Maximum number of blocks we keep in memory for migration.
static struct MigrationReadyPeer * peer_tail
Tail of linked list of peers.
static int transmit_content(struct MigrationReadyPeer *mrp, struct MigrationReadyBlock *block)
Send the given block to the given peer.
#define MIGRATION_LIST_SIZE
Blocks are at most migrated to this number of peers plus one, each time they are fetched from the dat...
static void delete_migration_block(struct MigrationReadyBlock *mb)
Delete the given migration block.
static void gather_migration_blocks(void *cls)
Task that is run periodically to obtain blocks for content migration.
void GSF_push_start_(struct GSF_ConnectedPeer *peer)
A peer connected to us.
static void process_migration_content(void *cls, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid)
Process content offered for migration.
static struct GNUNET_DATASTORE_QueueEntry * mig_qe
Request to datastore for migration (or NULL).
#define MIN_MIGRATION_CONTENT_LIFETIME
How long must content remain valid for us to consider it for migration? If content will expire too so...
GNUNET_BLOCK_Type
WARNING: This header is generated! In order to add DHT block types, you must register them in GANA,...
@ GNUNET_BLOCK_TYPE_FS_ONDEMAND
Type of a block representing a block to be encoded on demand from disk.
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_get_value_yesno(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option)
Get a configuration value that should be in a set of "YES" or "NO".
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_get_value_time(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, struct GNUNET_TIME_Relative *time)
Get a configuration value that should be a relative time.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_for_replication(struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a random value from the datastore for content replication.
void GNUNET_DATASTORE_cancel(struct GNUNET_DATASTORE_QueueEntry *qe)
Cancel a datastore operation.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_CONTAINER_DLL_insert_after(head, tail, other, element)
Insert an element into a DLL after the given other element.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
void GNUNET_CRYPTO_hash(const void *block, size_t size, struct GNUNET_HashCode *ret)
Compute hash of a given block.
Definition crypto_hash.c:41
uint32_t GNUNET_CRYPTO_hash_distance_u32(const struct GNUNET_HashCode *a, const struct GNUNET_HashCode *b)
Compute the distance between 2 hashcodes.
Definition crypto_hash.c:89
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
#define GNUNET_log(kind,...)
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
@ GNUNET_OK
@ GNUNET_YES
@ GNUNET_NO
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur.
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
void GNUNET_log_config_invalid(enum GNUNET_ErrorType kind, const char *section, const char *option, const char *required)
Log error message about invalid configuration option value.
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_DEBUG
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_free(ptr)
Wrapper around free.
void GNUNET_MQ_send_cancel(struct GNUNET_MQ_Envelope *ev)
Cancel sending the message.
Definition mq.c:785
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct.
void GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Call a callback once the envelope has been sent, that is, sending it can not be canceled anymore.
Definition mq.c:655
void GNUNET_PEER_decrement_rcs(const GNUNET_PEER_Id *ids, unsigned int count)
Decrement multiple RCs of peer identities by one.
Definition peer.c:157
unsigned int GNUNET_PEER_Id
A GNUNET_PEER_Id is simply a shorter version of a "struct GNUNET_PeerIdentifier" that can be used ins...
void GNUNET_PEER_change_rc(GNUNET_PEER_Id id, int delta)
Change the reference counter of an interned PID.
Definition peer.c:192
void GNUNET_PEER_resolve(GNUNET_PEER_Id id, struct GNUNET_PeerIdentity *pid)
Convert an interned PID to a normal peer identity.
Definition peer.c:220
#define GNUNET_MESSAGE_TYPE_FS_PUT
P2P response with content or active migration of content.
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition scheduler.c:980
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition scheduler.c:1277
struct GNUNET_TIME_Relative GNUNET_TIME_relative_max(struct GNUNET_TIME_Relative t1, struct GNUNET_TIME_Relative t2)
Return the maximum of two relative time values.
Definition time.c:352
#define GNUNET_TIME_UNIT_SECONDS
One second.
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_remaining(struct GNUNET_TIME_Absolute future)
Given a timestamp in the future, how much time remains until then?
Definition time.c:406
struct GNUNET_TIME_Relative GNUNET_TIME_relative_divide(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Divide relative time by a given factor.
Definition time.c:552
struct GNUNET_TIME_AbsoluteNBO GNUNET_TIME_absolute_hton(struct GNUNET_TIME_Absolute a)
Convert absolute time to network byte order.
Definition time.c:640
static unsigned int size
Size of the "table".
Definition peer.c:68
#define _(String)
GNU gettext support macro.
Definition platform.h:179
Entry in our priority queue.
A 512-bit hashcode.
The identity of the host (wraps the signing key of the peer).
Entry in list of pending tasks.
Definition scheduler.c:136
Time for absolute times used by GNUnet, in microseconds.
Time for relative time used by GNUnet, in microseconds.
uint64_t rel_value_us
The actual value.
Performance data kept for a peer.
GNUNET_PEER_Id pid
The peer's identity (interned version).
Block that is ready for migration to other peers.
GNUNET_PEER_Id target_list[2]
Peers we already forwarded this block to.
enum GNUNET_BLOCK_Type type
Type of the block.
struct GNUNET_HashCode query
Query for the block.
struct MigrationReadyBlock * next
This is a doubly-linked list.
size_t size
Size of the block.
struct GNUNET_TIME_Absolute expiration
When does this block expire?
struct MigrationReadyBlock * prev
This is a doubly-linked list.
Information about a peer waiting for migratable data.
struct GSF_ConnectedPeer * peer
Handle to peer.
struct GNUNET_MQ_Envelope * env
Envelope of the currently pushed message.
struct MigrationReadyPeer * prev
This is a doubly-linked list.
struct MigrationReadyPeer * next
This is a doubly-linked list.
Response from FS service with a result for a previous FS search.
Definition fs.h:330

Function Documentation

◆ delete_migration_block()

static void delete_migration_block ( struct MigrationReadyBlock mb)
static

Delete the given migration block.

Parameters
mbblock to delete

Definition at line 189 of file gnunet-service-fs_push.c.

References GNUNET_CONTAINER_DLL_remove, GNUNET_free, GNUNET_PEER_decrement_rcs(), mig_head, mig_size, mig_tail, MIGRATION_LIST_SIZE, and MigrationReadyBlock::target_list.

Referenced by find_content(), GSF_push_done_(), and transmit_content().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ find_content()

static void find_content ( void *  cls)
static

Find content for migration to this peer.

Parameters
clsA struct MigrationReadyPeer * to find content for

Definition at line 329 of file gnunet-service-fs_push.c.

330{
331 struct MigrationReadyPeer *mrp = cls;
332 struct MigrationReadyBlock *pos;
333 long score;
334 long best_score;
335 struct MigrationReadyBlock *best;
336
337 mrp->env = NULL;
338 best = NULL;
339 best_score = -1;
340 pos = mig_head;
341 while (NULL != pos)
342 {
343 score = score_content (mrp, pos);
344 if (score > best_score)
345 {
346 best_score = score;
347 best = pos;
348 }
349 pos = pos->next;
350 }
351 if (NULL == best)
352 {
354 {
356 "No content found for pushing, waiting for queue to fill\n");
357 return; /* will fill up eventually... */
358 }
360 "No suitable content found, purging content from full queue\n");
361 /* failed to find migration target AND
362 * queue is full, purge most-forwarded
363 * block from queue to make room for more */
364 pos = mig_head;
365 while (NULL != pos)
366 {
367 score = count_targets (pos);
368 if (score >= best_score)
369 {
370 best_score = score;
371 best = pos;
372 }
373 pos = pos->next;
374 }
375 GNUNET_assert (NULL != best);
378 return;
379 }
381 "Preparing to push best content to peer\n");
382 transmit_content (mrp,
383 best);
384}

References consider_gathering(), count_targets(), delete_migration_block(), MigrationReadyPeer::env, GNUNET_assert, GNUNET_ERROR_TYPE_DEBUG, GNUNET_log, MAX_MIGRATION_QUEUE, mig_head, mig_size, MigrationReadyBlock::next, score_content(), and transmit_content().

Referenced by GSF_push_start_(), and transmit_content().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ transmit_content()

static int transmit_content ( struct MigrationReadyPeer mrp,
struct MigrationReadyBlock block 
)
static

Send the given block to the given peer.

Parameters
mrptarget peer
blockthe block
Returns
GNUNET_YES if the block was deleted (!)

Definition at line 218 of file gnunet-service-fs_push.c.

220{
221 struct PutMessage *msg;
222 unsigned int i;
223 struct GSF_PeerPerformanceData *ppd;
224 int ret;
225
227 GNUNET_assert (NULL == mrp->env);
229 block->size,
231 msg->type = htonl (block->type);
232 msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
233 GNUNET_memcpy (&msg[1],
234 &block[1],
235 block->size);
236 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
237 {
238 if (block->target_list[i] == 0)
239 {
240 block->target_list[i] = ppd->pid;
242 1);
243 break;
244 }
245 }
246 if (MIGRATION_LIST_SIZE == i)
247 {
249 ret = GNUNET_YES;
250 }
251 else
252 {
253 ret = GNUNET_NO;
254 }
257 mrp);
259 GNUNET_NO,
260 0 /* priority */,
261 mrp->env);
262 return ret;
263}

References delete_migration_block(), MigrationReadyPeer::env, MigrationReadyBlock::expiration, find_content(), GNUNET_assert, GNUNET_memcpy, GNUNET_MESSAGE_TYPE_FS_PUT, GNUNET_MQ_msg_extra, GNUNET_MQ_notify_sent(), GNUNET_NO, GNUNET_PEER_change_rc(), GNUNET_TIME_absolute_hton(), GNUNET_YES, GSF_get_peer_performance_data_(), GSF_peer_transmit_(), MIGRATION_LIST_SIZE, msg, MigrationReadyPeer::peer, GSF_PeerPerformanceData::pid, ret, MigrationReadyBlock::size, MigrationReadyBlock::target_list, GNUNET_MessageHeader::type, and MigrationReadyBlock::type.

Referenced by find_content(), and process_migration_content().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ count_targets()

static unsigned int count_targets ( struct MigrationReadyBlock block)
static

Count the number of peers this block has already been forwarded to.

Parameters
blockthe block
Returns
number of times block was forwarded

Definition at line 274 of file gnunet-service-fs_push.c.

275{
276 unsigned int i;
277
278 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
279 if (block->target_list[i] == 0)
280 return i;
281 return i;
282}

References MIGRATION_LIST_SIZE, and MigrationReadyBlock::target_list.

Referenced by find_content().

Here is the caller graph for this function:

◆ score_content()

static long score_content ( struct MigrationReadyPeer mrp,
struct MigrationReadyBlock block 
)
static

Check if sending this block to this peer would be a good idea.

Parameters
mrptarget peer
blockthe block
Returns
score (>= 0: feasible, negative: infeasible)

Definition at line 294 of file gnunet-service-fs_push.c.

296{
297 unsigned int i;
298 struct GSF_PeerPerformanceData *ppd;
299 struct GNUNET_PeerIdentity id;
300 struct GNUNET_HashCode hc;
301 uint32_t dist;
302
304 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
305 if (block->target_list[i] == ppd->pid)
306 return -1;
307 GNUNET_assert (0 != ppd->pid);
309 &id);
311 sizeof(struct GNUNET_PeerIdentity),
312 &hc);
314 &hc);
315 /* closer distance, higher score: */
316 return UINT32_MAX - dist;
317}

References GNUNET_assert, GNUNET_CRYPTO_hash(), GNUNET_CRYPTO_hash_distance_u32(), GNUNET_PEER_resolve(), GSF_get_peer_performance_data_(), id, MIGRATION_LIST_SIZE, MigrationReadyPeer::peer, GSF_PeerPerformanceData::pid, MigrationReadyBlock::query, and MigrationReadyBlock::target_list.

Referenced by find_content().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ consider_gathering()

static void consider_gathering ( void  )
static

If the migration task is not currently running, consider (re)scheduling it with the appropriate delay.

Definition at line 402 of file gnunet-service-fs_push.c.

403{
404 struct GNUNET_TIME_Relative delay;
405
406 if (NULL == GSF_dsh)
407 return;
408 if (NULL != mig_qe)
409 return;
410 if (NULL != mig_task)
411 return;
413 return;
415 mig_size);
416 delay = GNUNET_TIME_relative_divide (delay,
418 delay = GNUNET_TIME_relative_max (delay,
420 if (GNUNET_NO == value_found)
421 {
422 /* wait at least 5s if the datastore is empty */
423 delay = GNUNET_TIME_relative_max (delay,
426 5));
427 }
429 "Scheduling gathering task (queue size: %u)\n",
430 mig_size);
433 NULL);
434}

References gather_migration_blocks(), GNUNET_ERROR_TYPE_DEBUG, GNUNET_log, GNUNET_NO, GNUNET_SCHEDULER_add_delayed(), GNUNET_TIME_relative_divide(), GNUNET_TIME_relative_max(), GNUNET_TIME_relative_multiply(), GNUNET_TIME_UNIT_SECONDS, GSF_dsh, MAX_MIGRATION_QUEUE, mig_qe, mig_size, mig_task, min_migration_delay, and value_found.

Referenced by find_content(), gather_migration_blocks(), GSF_push_init_(), and process_migration_content().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ gather_migration_blocks()

static void gather_migration_blocks ( void *  cls)
static

Task that is run periodically to obtain blocks for content migration.

Parameters
clsunused

Definition at line 539 of file gnunet-service-fs_push.c.

540{
541 mig_task = NULL;
543 return;
544 if (NULL == GSF_dsh)
545 return;
547 "Asking datastore for content for replication (queue size: %u)\n",
548 mig_size);
551 0,
552 UINT_MAX,
554 NULL);
555 if (NULL == mig_qe)
557}

References consider_gathering(), GNUNET_DATASTORE_get_for_replication(), GNUNET_ERROR_TYPE_DEBUG, GNUNET_log, GNUNET_NO, GSF_dsh, MAX_MIGRATION_QUEUE, mig_qe, mig_size, mig_task, process_migration_content(), and value_found.

Referenced by consider_gathering().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ process_migration_content()

static void process_migration_content ( void *  cls,
const struct GNUNET_HashCode key,
size_t  size,
const void *  data,
enum GNUNET_BLOCK_Type  type,
uint32_t  priority,
uint32_t  anonymity,
uint32_t  replication,
struct GNUNET_TIME_Absolute  expiration,
uint64_t  uid 
)
static

Process content offered for migration.

Parameters
clsclosure
keykey for the content
sizenumber of bytes in data
datacontent stored
typetype of the content
prioritypriority of the content
anonymityanonymity-level for the content
replicationreplication-level for the content
expirationexpiration time for the content
uidunique identifier for the datum; maybe 0 if no unique identifier is available

Definition at line 453 of file gnunet-service-fs_push.c.

463{
464 struct MigrationReadyBlock *mb;
465 struct MigrationReadyPeer *pos;
466
467 mig_qe = NULL;
468 if (NULL == key)
469 {
471 "No content found for migration...\n");
473 return;
474 }
478 {
479 /* content will expire soon, don't bother */
481 return;
482 }
484 {
485 if (GNUNET_OK !=
487 size,
488 data,
489 type,
490 priority,
491 anonymity,
494 uid,
496 NULL))
498 return;
499 }
501 "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
502 GNUNET_h2s (key),
503 type, mig_size + 1,
505 mb = GNUNET_malloc (sizeof(struct MigrationReadyBlock) + size);
506 mb->query = *key;
508 mb->size = size;
509 mb->type = type;
510 GNUNET_memcpy (&mb[1], data, size);
512 mig_tail,
513 mig_tail,
514 mb);
515 mig_size++;
516 for (pos = peer_head; NULL != pos; pos = pos->next)
517 {
519 "Preparing to push best content to peer %s\n",
521 if ((NULL == pos->env) &&
523 mb)))
524 {
525 break; /* 'mb' was freed! */
526 }
527 }
529}

References anonymity, consider_gathering(), data, MigrationReadyPeer::env, expiration, MigrationReadyBlock::expiration, GNUNET_BLOCK_TYPE_FS_ONDEMAND, GNUNET_CONTAINER_DLL_insert_after, GNUNET_ERROR_TYPE_DEBUG, GNUNET_FS_handle_on_demand_block(), GNUNET_h2s(), GNUNET_i2s(), GNUNET_log, GNUNET_malloc, GNUNET_memcpy, GNUNET_OK, GNUNET_TIME_absolute_get_remaining(), GNUNET_YES, GSF_connected_peer_get_identity2_(), key, MAX_MIGRATION_QUEUE, mig_head, mig_qe, mig_size, mig_tail, MIN_MIGRATION_CONTENT_LIFETIME, MigrationReadyPeer::next, MigrationReadyPeer::peer, peer_head, process_migration_content(), MigrationReadyBlock::query, GNUNET_TIME_Relative::rel_value_us, replication, size, MigrationReadyBlock::size, transmit_content(), type, MigrationReadyBlock::type, and value_found.

Referenced by gather_migration_blocks(), and process_migration_content().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GSF_push_start_()

void GSF_push_start_ ( struct GSF_ConnectedPeer peer)

A peer connected to us.

A peer connected to us or we are now again allowed to push content.

Start pushing content to this peer.

Parameters
peerhandle for the peer that connected

Definition at line 567 of file gnunet-service-fs_push.c.

568{
569 struct MigrationReadyPeer *mrp;
570
571 if (GNUNET_YES != enabled)
572 return;
573 for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
574 if (mrp->peer == peer)
575 break;
576 if (NULL != mrp)
577 {
578 /* same peer added twice, must not happen */
579 GNUNET_break (0);
580 return;
581 }
582
584 "Adding peer %s to list for pushing\n",
586
587 mrp = GNUNET_new (struct MigrationReadyPeer);
588 mrp->peer = peer;
589 find_content (mrp);
591 peer_tail,
592 mrp);
593}

References enabled, find_content(), GNUNET_break, GNUNET_CONTAINER_DLL_insert, GNUNET_ERROR_TYPE_DEBUG, GNUNET_i2s(), GNUNET_log, GNUNET_new, GNUNET_YES, GSF_connected_peer_get_identity2_(), MigrationReadyPeer::next, MigrationReadyPeer::peer, peer_head, and peer_tail.

Referenced by peer_respect_cb(), and revive_migration().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GSF_push_stop_()

void GSF_push_stop_ ( struct GSF_ConnectedPeer peer)

A peer disconnected from us.

A peer disconnected from us or asked us to stop pushing content for a while.

Stop pushing content to this peer.

Parameters
peerhandle for the peer that disconnected

Definition at line 603 of file gnunet-service-fs_push.c.

604{
605 struct MigrationReadyPeer *pos;
606
607 for (pos = peer_head; NULL != pos; pos = pos->next)
608 if (pos->peer == peer)
609 break;
610 if (NULL == pos)
611 return;
612 if (NULL != pos->env)
615 peer_tail,
616 pos);
617 GNUNET_free (pos);
618}

References MigrationReadyPeer::env, GNUNET_CONTAINER_DLL_remove, GNUNET_free, GNUNET_MQ_send_cancel(), MigrationReadyPeer::next, MigrationReadyPeer::peer, peer_head, and peer_tail.

Referenced by GSF_peer_disconnect_handler(), and handle_p2p_migration_stop().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GSF_push_init_()

void GSF_push_init_ ( void  )

Setup the module.

Definition at line 625 of file gnunet-service-fs_push.c.

626{
627 enabled =
629 "FS",
630 "CONTENT_PUSHING");
631 if (GNUNET_YES != enabled)
632 return;
633
634 if (GNUNET_OK !=
636 "fs",
637 "MIN_MIGRATION_DELAY",
639 {
641 "fs",
642 "MIN_MIGRATION_DELAY",
643 _ ("time required, content pushing disabled"));
644 return;
645 }
647}

References _, consider_gathering(), enabled, GNUNET_CONFIGURATION_get_value_time(), GNUNET_CONFIGURATION_get_value_yesno(), GNUNET_ERROR_TYPE_WARNING, GNUNET_log_config_invalid(), GNUNET_OK, GNUNET_YES, GSF_cfg, and min_migration_delay.

Referenced by run().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GSF_push_done_()

void GSF_push_done_ ( void  )

Shutdown the module.

Definition at line 654 of file gnunet-service-fs_push.c.

655{
656 if (NULL != mig_task)
657 {
659 mig_task = NULL;
660 }
661 if (NULL != mig_qe)
662 {
664 mig_qe = NULL;
665 }
666 while (NULL != mig_head)
668 GNUNET_assert (0 == mig_size);
669}

References delete_migration_block(), GNUNET_assert, GNUNET_DATASTORE_cancel(), GNUNET_SCHEDULER_cancel(), mig_head, mig_qe, mig_size, and mig_task.

Referenced by shutdown_task().

Here is the call graph for this function:
Here is the caller graph for this function:

Variable Documentation

◆ mig_head

struct MigrationReadyBlock* mig_head
static

Head of linked list of blocks that can be migrated.

Definition at line 134 of file gnunet-service-fs_push.c.

Referenced by delete_migration_block(), find_content(), GSF_push_done_(), and process_migration_content().

◆ mig_tail

struct MigrationReadyBlock* mig_tail
static

Tail of linked list of blocks that can be migrated.

Definition at line 139 of file gnunet-service-fs_push.c.

Referenced by delete_migration_block(), and process_migration_content().

◆ peer_head

struct MigrationReadyPeer* peer_head
static

Head of linked list of peers.

Definition at line 144 of file gnunet-service-fs_push.c.

Referenced by GSF_push_start_(), GSF_push_stop_(), and process_migration_content().

◆ peer_tail

struct MigrationReadyPeer* peer_tail
static

Tail of linked list of peers.

Definition at line 149 of file gnunet-service-fs_push.c.

Referenced by GSF_push_start_(), and GSF_push_stop_().

◆ mig_qe

struct GNUNET_DATASTORE_QueueEntry* mig_qe
static

Request to datastore for migration (or NULL).

Definition at line 154 of file gnunet-service-fs_push.c.

Referenced by consider_gathering(), gather_migration_blocks(), GSF_push_done_(), and process_migration_content().

◆ mig_task

struct GNUNET_SCHEDULER_Task* mig_task
static

ID of task that collects blocks for migration.

Definition at line 159 of file gnunet-service-fs_push.c.

Referenced by consider_gathering(), gather_migration_blocks(), and GSF_push_done_().

◆ min_migration_delay

struct GNUNET_TIME_Relative min_migration_delay
static

What is the maximum frequency at which we are allowed to poll the datastore for migration content?

Definition at line 165 of file gnunet-service-fs_push.c.

Referenced by consider_gathering(), and GSF_push_init_().

◆ mig_size

unsigned int mig_size
static

Size of the doubly-linked list of migration blocks.

Definition at line 170 of file gnunet-service-fs_push.c.

Referenced by consider_gathering(), delete_migration_block(), find_content(), gather_migration_blocks(), GSF_push_done_(), and process_migration_content().

◆ enabled

int enabled
static

Is this module enabled?

Definition at line 175 of file gnunet-service-fs_push.c.

Referenced by GSF_push_init_(), and GSF_push_start_().

◆ value_found

int value_found
static

Did we find anything in the datastore?

Definition at line 180 of file gnunet-service-fs_push.c.

Referenced by consider_gathering(), gather_migration_blocks(), and process_migration_content().