GNUnet 0.21.1
gnunet-service-fs_push.c
Go to the documentation of this file.
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2011, 2016 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
27#include "platform.h"
28#include "gnunet-service-fs.h"
32
33
37#define MAX_MIGRATION_QUEUE 8
38
43#define MIGRATION_LIST_SIZE 2
44
52#define MIN_MIGRATION_CONTENT_LIFETIME GNUNET_TIME_relative_multiply ( \
53 GNUNET_TIME_UNIT_MINUTES, 30)
54
55
60{
65
70
75
80
86
90 size_t size;
91
95 unsigned int used_targets;
96
101};
102
103
108{
113
118
123
128};
129
130
135
140
145
150
155
160
166
170static unsigned int mig_size;
171
175static int enabled;
176
180static int value_found;
181
182
188static void
190{
192 mig_tail,
193 mb);
196 mig_size--;
197 GNUNET_free (mb);
198}
199
200
206static void
207find_content (void *cls);
208
209
217static int
219 struct MigrationReadyBlock *block)
220{
221 struct PutMessage *msg;
222 unsigned int i;
223 struct GSF_PeerPerformanceData *ppd;
224 int ret;
225
227 GNUNET_assert (NULL == mrp->env);
229 block->size,
231 msg->type = htonl (block->type);
232 msg->expiration = GNUNET_TIME_absolute_hton (block->expiration);
233 GNUNET_memcpy (&msg[1],
234 &block[1],
235 block->size);
236 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
237 {
238 if (block->target_list[i] == 0)
239 {
240 block->target_list[i] = ppd->pid;
242 1);
243 break;
244 }
245 }
246 if (MIGRATION_LIST_SIZE == i)
247 {
249 ret = GNUNET_YES;
250 }
251 else
252 {
253 ret = GNUNET_NO;
254 }
257 mrp);
259 GNUNET_NO,
260 0 /* priority */,
261 mrp->env);
262 return ret;
263}
264
265
273static unsigned int
275{
276 unsigned int i;
277
278 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
279 if (block->target_list[i] == 0)
280 return i;
281 return i;
282}
283
284
293static long
295 struct MigrationReadyBlock *block)
296{
297 unsigned int i;
298 struct GSF_PeerPerformanceData *ppd;
299 struct GNUNET_PeerIdentity id;
300 struct GNUNET_HashCode hc;
301 uint32_t dist;
302
304 for (i = 0; i < MIGRATION_LIST_SIZE; i++)
305 if (block->target_list[i] == ppd->pid)
306 return -1;
307 GNUNET_assert (0 != ppd->pid);
309 &id);
311 sizeof(struct GNUNET_PeerIdentity),
312 &hc);
314 &hc);
315 /* closer distance, higher score: */
316 return UINT32_MAX - dist;
317}
318
319
324static void
325consider_gathering (void);
326
327
328static void
329find_content (void *cls)
330{
331 struct MigrationReadyPeer *mrp = cls;
332 struct MigrationReadyBlock *pos;
333 long score;
334 long best_score;
335 struct MigrationReadyBlock *best;
336
337 mrp->env = NULL;
338 best = NULL;
339 best_score = -1;
340 pos = mig_head;
341 while (NULL != pos)
342 {
343 score = score_content (mrp, pos);
344 if (score > best_score)
345 {
346 best_score = score;
347 best = pos;
348 }
349 pos = pos->next;
350 }
351 if (NULL == best)
352 {
354 {
356 "No content found for pushing, waiting for queue to fill\n");
357 return; /* will fill up eventually... */
358 }
360 "No suitable content found, purging content from full queue\n");
361 /* failed to find migration target AND
362 * queue is full, purge most-forwarded
363 * block from queue to make room for more */
364 pos = mig_head;
365 while (NULL != pos)
366 {
367 score = count_targets (pos);
368 if (score >= best_score)
369 {
370 best_score = score;
371 best = pos;
372 }
373 pos = pos->next;
374 }
375 GNUNET_assert (NULL != best);
378 return;
379 }
381 "Preparing to push best content to peer\n");
382 transmit_content (mrp,
383 best);
384}
385
386
393static void
394gather_migration_blocks (void *cls);
395
396
401static void
403{
404 struct GNUNET_TIME_Relative delay;
405
406 if (NULL == GSF_dsh)
407 return;
408 if (NULL != mig_qe)
409 return;
410 if (NULL != mig_task)
411 return;
413 return;
415 mig_size);
416 delay = GNUNET_TIME_relative_divide (delay,
418 delay = GNUNET_TIME_relative_max (delay,
420 if (GNUNET_NO == value_found)
421 {
422 /* wait at least 5s if the datastore is empty */
423 delay = GNUNET_TIME_relative_max (delay,
426 5));
427 }
429 "Scheduling gathering task (queue size: %u)\n",
430 mig_size);
433 NULL);
434}
435
436
452static void
454 const struct GNUNET_HashCode *key,
455 size_t size,
456 const void *data,
458 uint32_t priority,
459 uint32_t anonymity,
460 uint32_t replication,
462 uint64_t uid)
463{
464 struct MigrationReadyBlock *mb;
465 struct MigrationReadyPeer *pos;
466
467 mig_qe = NULL;
468 if (NULL == key)
469 {
471 "No content found for migration...\n");
473 return;
474 }
478 {
479 /* content will expire soon, don't bother */
481 return;
482 }
484 {
485 if (GNUNET_OK !=
487 size,
488 data,
489 type,
490 priority,
491 anonymity,
494 uid,
496 NULL))
498 return;
499 }
501 "Retrieved block `%s' of type %u for migration (queue size: %u/%u)\n",
502 GNUNET_h2s (key),
503 type, mig_size + 1,
505 mb = GNUNET_malloc (sizeof(struct MigrationReadyBlock) + size);
506 mb->query = *key;
508 mb->size = size;
509 mb->type = type;
510 GNUNET_memcpy (&mb[1], data, size);
512 mig_tail,
513 mig_tail,
514 mb);
515 mig_size++;
516 for (pos = peer_head; NULL != pos; pos = pos->next)
517 {
519 "Preparing to push best content to peer %s\n",
521 if ((NULL == pos->env) &&
523 mb)))
524 {
525 break; /* 'mb' was freed! */
526 }
527 }
529}
530
531
538static void
540{
541 mig_task = NULL;
543 return;
544 if (NULL == GSF_dsh)
545 return;
547 "Asking datastore for content for replication (queue size: %u)\n",
548 mig_size);
551 0,
552 UINT_MAX,
554 NULL);
555 if (NULL == mig_qe)
557}
558
559
566void
568{
569 struct MigrationReadyPeer *mrp;
570
571 if (GNUNET_YES != enabled)
572 return;
573 for (mrp = peer_head; NULL != mrp; mrp = mrp->next)
574 if (mrp->peer == peer)
575 break;
576 if (NULL != mrp)
577 {
578 /* same peer added twice, must not happen */
579 GNUNET_break (0);
580 return;
581 }
582
584 "Adding peer %s to list for pushing\n",
586
587 mrp = GNUNET_new (struct MigrationReadyPeer);
588 mrp->peer = peer;
589 find_content (mrp);
591 peer_tail,
592 mrp);
593}
594
595
602void
604{
605 struct MigrationReadyPeer *pos;
606
607 for (pos = peer_head; NULL != pos; pos = pos->next)
608 if (pos->peer == peer)
609 break;
610 if (NULL == pos)
611 return;
612 if (NULL != pos->env)
615 peer_tail,
616 pos);
617 GNUNET_free (pos);
618}
619
620
624void
626{
627 enabled =
629 "FS",
630 "CONTENT_PUSHING");
631 if (GNUNET_YES != enabled)
632 return;
633
634 if (GNUNET_OK !=
636 "fs",
637 "MIN_MIGRATION_DELAY",
639 {
641 "fs",
642 "MIN_MIGRATION_DELAY",
643 _ ("time required, content pushing disabled"));
644 return;
645 }
647}
648
649
653void
655{
656 if (NULL != mig_task)
657 {
659 mig_task = NULL;
660 }
661 if (NULL != mig_qe)
662 {
664 mig_qe = NULL;
665 }
666 while (NULL != mig_head)
668 GNUNET_assert (0 == mig_size);
669}
670
671
672/* end of gnunet-service-fs_push.c */
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
static int ret
Final status code.
Definition: gnunet-arm.c:94
static unsigned int replication
Desired replication level.
static char * data
The data to insert into the dht.
struct GNUNET_HashCode key
The key used in the DHT.
static struct GNUNET_TIME_Relative expiration
User supplied expiration value.
static unsigned int anonymity
static struct GNUNET_IDENTITY_Handle * id
Handle to IDENTITY.
static uint32_t type
Type string converted to DNS type value.
const struct GNUNET_CONFIGURATION_Handle * GSF_cfg
Our configuration.
struct GNUNET_DATASTORE_Handle * GSF_dsh
Our connection to the datastore.
shared data structures of gnunet-service-fs.c
struct GSF_PeerPerformanceData * GSF_get_peer_performance_data_(struct GSF_ConnectedPeer *cp)
Return the performance data record for the given peer.
const struct GNUNET_PeerIdentity * GSF_connected_peer_get_identity2_(const struct GSF_ConnectedPeer *cp)
Obtain the identity of a connected peer.
void GSF_peer_transmit_(struct GSF_ConnectedPeer *cp, int is_query, uint32_t priority, struct GNUNET_MQ_Envelope *env)
Transmit a message to the given peer as soon as possible.
API to handle 'connected peers'.
int GNUNET_FS_handle_on_demand_block(const struct GNUNET_HashCode *key, uint32_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid, GNUNET_DATASTORE_DatumProcessor cont, void *cont_cls)
We've received an on-demand encoded block from the datastore.
indexing for the file-sharing service
static int enabled
Is this module enabled?
static unsigned int mig_size
Size of the doubly-linked list of migration blocks.
static struct GNUNET_SCHEDULER_Task * mig_task
ID of task that collects blocks for migration.
static unsigned int count_targets(struct MigrationReadyBlock *block)
Count the number of peers this block has already been forwarded to.
static struct MigrationReadyPeer * peer_head
Head of linked list of peers.
static void find_content(void *cls)
Find content for migration to this peer.
static struct MigrationReadyBlock * mig_tail
Tail of linked list of blocks that can be migrated.
static struct MigrationReadyBlock * mig_head
Head of linked list of blocks that can be migrated.
static long score_content(struct MigrationReadyPeer *mrp, struct MigrationReadyBlock *block)
Check if sending this block to this peer would be a good idea.
static struct GNUNET_TIME_Relative min_migration_delay
What is the maximum frequency at which we are allowed to poll the datastore for migration content?
void GSF_push_init_()
Setup the module.
static int value_found
Did we find anything in the datastore?
void GSF_push_done_()
Shutdown the module.
static void consider_gathering(void)
If the migration task is not currently running, consider (re)scheduling it with the appropriate delay...
void GSF_push_stop_(struct GSF_ConnectedPeer *peer)
A peer disconnected from us.
#define MAX_MIGRATION_QUEUE
Maximum number of blocks we keep in memory for migration.
static struct MigrationReadyPeer * peer_tail
Tail of linked list of peers.
static int transmit_content(struct MigrationReadyPeer *mrp, struct MigrationReadyBlock *block)
Send the given block to the given peer.
#define MIGRATION_LIST_SIZE
Blocks are at most migrated to this number of peers plus one, each time they are fetched from the dat...
static void delete_migration_block(struct MigrationReadyBlock *mb)
Delete the given migration block.
static void gather_migration_blocks(void *cls)
Task that is run periodically to obtain blocks for content migration.
void GSF_push_start_(struct GSF_ConnectedPeer *peer)
A peer connected to us.
static void process_migration_content(void *cls, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid)
Process content offered for migration.
static struct GNUNET_DATASTORE_QueueEntry * mig_qe
Request to datastore for migration (or NULL).
#define MIN_MIGRATION_CONTENT_LIFETIME
How long must content remain valid for us to consider it for migration? If content will expire too so...
support for pushing out content
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_get_value_yesno(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option)
Get a configuration value that should be in a set of "YES" or "NO".
enum GNUNET_GenericReturnValue GNUNET_CONFIGURATION_get_value_time(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *section, const char *option, struct GNUNET_TIME_Relative *time)
Get a configuration value that should be a relative time.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_for_replication(struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a random value from the datastore for content replication.
void GNUNET_DATASTORE_cancel(struct GNUNET_DATASTORE_QueueEntry *qe)
Cancel a datastore operation.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_CONTAINER_DLL_insert_after(head, tail, other, element)
Insert an element into a DLL after the given other element.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
void GNUNET_CRYPTO_hash(const void *block, size_t size, struct GNUNET_HashCode *ret)
Compute hash of a given block.
Definition: crypto_hash.c:41
uint32_t GNUNET_CRYPTO_hash_distance_u32(const struct GNUNET_HashCode *a, const struct GNUNET_HashCode *b)
Compute the distance between 2 hashcodes.
Definition: crypto_hash.c:89
#define GNUNET_log(kind,...)
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
@ GNUNET_OK
@ GNUNET_YES
@ GNUNET_NO
const char * GNUNET_i2s(const struct GNUNET_PeerIdentity *pid)
Convert a peer identity to a string (for printing debug messages).
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur.
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
void GNUNET_log_config_invalid(enum GNUNET_ErrorType kind, const char *section, const char *option, const char *required)
Log error message about invalid configuration option value.
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_DEBUG
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_free(ptr)
Wrapper around free.
void GNUNET_MQ_send_cancel(struct GNUNET_MQ_Envelope *ev)
Cancel sending the message.
Definition: mq.c:768
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct.
Definition: gnunet_mq_lib.h:63
void GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Call a callback once the envelope has been sent, that is, sending it can not be canceled anymore.
Definition: mq.c:638
void GNUNET_PEER_decrement_rcs(const GNUNET_PEER_Id *ids, unsigned int count)
Decrement multiple RCs of peer identities by one.
Definition: peer.c:157
unsigned int GNUNET_PEER_Id
A GNUNET_PEER_Id is simply a shorter version of a "struct GNUNET_PeerIdentifier" that can be used ins...
void GNUNET_PEER_change_rc(GNUNET_PEER_Id id, int delta)
Change the reference counter of an interned PID.
Definition: peer.c:192
void GNUNET_PEER_resolve(GNUNET_PEER_Id id, struct GNUNET_PeerIdentity *pid)
Convert an interned PID to a normal peer identity.
Definition: peer.c:220
#define GNUNET_MESSAGE_TYPE_FS_PUT
P2P response with content or active migration of content.
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:981
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1278
struct GNUNET_TIME_Relative GNUNET_TIME_relative_max(struct GNUNET_TIME_Relative t1, struct GNUNET_TIME_Relative t2)
Return the maximum of two relative time values.
Definition: time.c:351
#define GNUNET_TIME_UNIT_SECONDS
One second.
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_remaining(struct GNUNET_TIME_Absolute future)
Given a timestamp in the future, how much time remains until then?
Definition: time.c:405
struct GNUNET_TIME_Relative GNUNET_TIME_relative_multiply(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Multiply relative time by a given factor.
Definition: time.c:484
struct GNUNET_TIME_Relative GNUNET_TIME_relative_divide(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Divide relative time by a given factor.
Definition: time.c:550
struct GNUNET_TIME_AbsoluteNBO GNUNET_TIME_absolute_hton(struct GNUNET_TIME_Absolute a)
Convert absolute time to network byte order.
Definition: time.c:638
static unsigned int size
Size of the "table".
Definition: peer.c:68
#define _(String)
GNU gettext support macro.
Definition: platform.h:178
GNUNET_BLOCK_Type
WARNING: This header is generated! In order to add DHT block types, you must register them in GANA,...
@ GNUNET_BLOCK_TYPE_FS_ONDEMAND
Type of a block representing a block to be encoded on demand from disk.
Entry in our priority queue.
A 512-bit hashcode.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
The identity of the host (wraps the signing key of the peer).
Entry in list of pending tasks.
Definition: scheduler.c:136
Time for absolute times used by GNUnet, in microseconds.
Time for relative time used by GNUnet, in microseconds.
A connected peer.
Performance data kept for a peer.
GNUNET_PEER_Id pid
The peer's identity (interned version).
Block that is ready for migration to other peers.
GNUNET_PEER_Id target_list[2]
Peers we already forwarded this block to.
enum GNUNET_BLOCK_Type type
Type of the block.
struct GNUNET_HashCode query
Query for the block.
unsigned int used_targets
Number of targets already used.
struct MigrationReadyBlock * next
This is a doubly-linked list.
size_t size
Size of the block.
struct GNUNET_TIME_Absolute expiration
When does this block expire?
struct MigrationReadyBlock * prev
This is a doubly-linked list.
Information about a peer waiting for migratable data.
struct GSF_ConnectedPeer * peer
Handle to peer.
struct GNUNET_MQ_Envelope * env
Envelope of the currently pushed message.
struct MigrationReadyPeer * prev
This is a doubly-linked list.
struct MigrationReadyPeer * next
This is a doubly-linked list.
Response from FS service with a result for a previous FS search.
Definition: fs.h:330