GNUnet  0.10.x
fragmentation.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2009-2013 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
25 #include "platform.h"
27 #include "gnunet_protocols.h"
28 #include "fragmentation.h"
29 
30 
34 #define MIN_ACK_DELAY GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS, 1)
35 
36 
45 
50 
55 
60 
65 
70 
74  const struct GNUNET_MessageHeader *msg;
75 
80 
84  void *proc_cls;
85 
89  uint64_t acks;
90 
95  uint64_t acks_mask;
96 
101 
105  uint32_t fragment_id;
106 
110  unsigned int next_transmission;
111 
115  unsigned int num_rounds;
116 
120  unsigned int num_transmissions;
121 
125  int8_t proc_busy;
126 
130  int8_t wack;
131 
135  uint16_t mtu;
136 };
137 
138 
145 const char *
147 {
148  static char buf[128];
149  const struct FragmentAcknowledgement *fa;
150 
151  if (sizeof(struct FragmentAcknowledgement) !=
152  htons(ack->size))
153  return "<malformed ack>";
154  fa = (const struct FragmentAcknowledgement *)ack;
155  GNUNET_snprintf(buf,
156  sizeof(buf),
157  "%u-%llX",
158  ntohl(fa->fragment_id),
159  GNUNET_ntohll(fa->bits));
160  return buf;
161 }
162 
163 
169 static void
170 transmit_next(void *cls)
171 {
172  struct GNUNET_FRAGMENT_Context *fc = cls;
173  char msg[fc->mtu];
174  const char *mbuf;
175  struct FragmentHeader *fh;
176  struct GNUNET_TIME_Relative delay;
177  unsigned int bit;
178  size_t size;
179  size_t fsize;
180  int wrap;
181 
182  fc->task = NULL;
184  if (0 == fc->acks)
185  return; /* all done */
186  /* calculate delay */
187  wrap = 0;
188  while (0 == (fc->acks & (1LLU << fc->next_transmission)))
189  {
190  fc->next_transmission = (fc->next_transmission + 1) % 64;
191  wrap |= (0 == fc->next_transmission);
192  }
193  bit = fc->next_transmission;
194  size = ntohs(fc->msg->size);
195  if (bit == size / (fc->mtu - sizeof(struct FragmentHeader)))
196  fsize =
197  (size % (fc->mtu - sizeof(struct FragmentHeader))) +
198  sizeof(struct FragmentHeader);
199  else
200  fsize = fc->mtu;
201  if (NULL != fc->tracker)
203  fsize);
204  else
205  delay = GNUNET_TIME_UNIT_ZERO;
206  if (delay.rel_value_us > 0)
207  {
209  "Fragmentation logic delays transmission of next fragment by %s\n",
211  GNUNET_YES));
213  &transmit_next,
214  fc);
215  return;
216  }
217  fc->next_transmission = (fc->next_transmission + 1) % 64;
218  wrap |= (0 == fc->next_transmission);
219  while (0 == (fc->acks & (1LLU << fc->next_transmission)))
220  {
221  fc->next_transmission = (fc->next_transmission + 1) % 64;
222  wrap |= (0 == fc->next_transmission);
223  }
224 
225  /* assemble fragmentation message */
226  mbuf = (const char *)&fc[1];
227  fh = (struct FragmentHeader *)msg;
228  fh->header.size = htons(fsize);
230  fh->fragment_id = htonl(fc->fragment_id);
231  fh->total_size = fc->msg->size; /* already in big-endian */
232  fh->offset = htons((fc->mtu - sizeof(struct FragmentHeader)) * bit);
233  GNUNET_memcpy(&fh[1], &mbuf[bit * (fc->mtu - sizeof(struct FragmentHeader))],
234  fsize - sizeof(struct FragmentHeader));
235  if (NULL != fc->tracker)
238  _("# fragments transmitted"),
239  1,
240  GNUNET_NO);
241  if (0 != fc->last_round.abs_value_us)
243  _("# fragments retransmitted"),
244  1,
245  GNUNET_NO);
246 
247  /* select next message to calculate delay */
248  bit = fc->next_transmission;
249  size = ntohs(fc->msg->size);
250  if (bit == size / (fc->mtu - sizeof(struct FragmentHeader)))
251  fsize = size % (fc->mtu - sizeof(struct FragmentHeader));
252  else
253  fsize = fc->mtu;
254  if (NULL != fc->tracker)
256  fsize);
257  else
258  delay = GNUNET_TIME_UNIT_ZERO;
259  if (fc->num_rounds < 64)
260  delay = GNUNET_TIME_relative_max(delay,
262  (fc->msg_delay,
263  (1ULL << fc->num_rounds)));
264  else
266  if (wrap)
267  {
268  /* full round transmitted wait 2x delay for ACK before going again */
269  fc->num_rounds++;
271  /* never use zero, need some time for ACK always */
272  delay = GNUNET_TIME_relative_max(MIN_ACK_DELAY, delay);
273  fc->wack = GNUNET_YES;
276  _("# fragments wrap arounds"),
277  1,
278  GNUNET_NO);
279  }
280  fc->proc_busy = GNUNET_YES;
282  fc->num_transmissions++;
283  fc->proc(fc->proc_cls,
284  &fh->header);
285 }
286 
287 
310  uint16_t mtu,
314  const struct GNUNET_MessageHeader *msg,
316  void *proc_cls)
317 {
318  struct GNUNET_FRAGMENT_Context *fc;
319  size_t size;
320  uint64_t bits;
321 
323  _("# messages fragmented"),
324  1,
325  GNUNET_NO);
326  GNUNET_assert(mtu >= 1024 + sizeof(struct FragmentHeader));
327  size = ntohs(msg->size);
329  _("# total size of fragmented messages"),
330  size, GNUNET_NO);
331  GNUNET_assert(size >= sizeof(struct GNUNET_MessageHeader));
332  fc = GNUNET_malloc(sizeof(struct GNUNET_FRAGMENT_Context) + size);
333  fc->stats = stats;
334  fc->mtu = mtu;
335  fc->tracker = tracker;
336  fc->ack_delay = ack_delay;
337  fc->msg_delay = msg_delay;
338  fc->msg = (const struct GNUNET_MessageHeader *)&fc[1];
339  fc->proc = proc;
340  fc->proc_cls = proc_cls;
341  fc->fragment_id =
343  UINT32_MAX);
344  GNUNET_memcpy(&fc[1], msg, size);
345  bits =
346  (size + mtu - sizeof(struct FragmentHeader) - 1) / (mtu -
347  sizeof(struct
348  FragmentHeader));
349  GNUNET_assert(bits <= 64);
350  if (bits == 64)
351  fc->acks_mask = UINT64_MAX; /* set all 64 bit */
352  else
353  fc->acks_mask = (1LLU << bits) - 1; /* set lowest 'bits' bit */
354  fc->acks = fc->acks_mask;
356  return fc;
357 }
358 
359 
367 void
369 {
371  fc->proc_busy = GNUNET_NO;
372  GNUNET_assert(fc->task == NULL);
373  fc->task =
375  &transmit_next,
376  fc);
377 }
378 
379 
391 int
393  const struct GNUNET_MessageHeader *msg)
394 {
395  const struct FragmentAcknowledgement *fa;
396  uint64_t abits;
397  struct GNUNET_TIME_Relative ndelay;
398  unsigned int ack_cnt;
399  unsigned int snd_cnt;
400  unsigned int i;
401 
402  if (sizeof(struct FragmentAcknowledgement) != ntohs(msg->size))
403  {
404  GNUNET_break_op(0);
405  return GNUNET_SYSERR;
406  }
407  fa = (const struct FragmentAcknowledgement *)msg;
408  if (ntohl(fa->fragment_id) != fc->fragment_id)
409  return GNUNET_SYSERR; /* not our ACK */
410  abits = GNUNET_ntohll(fa->bits);
411  if ((GNUNET_YES == fc->wack) &&
412  (0 != fc->num_transmissions))
413  {
414  /* normal ACK, can update running average of delay... */
415  fc->wack = GNUNET_NO;
417  fc->ack_delay.rel_value_us =
418  (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->ack_delay.rel_value_us) / 4;
419  /* calculate ratio msg sent vs. msg acked */
420  ack_cnt = 0;
421  snd_cnt = 0;
422  for (i = 0; i < 64; i++)
423  {
424  if (1 == (fc->acks_mask & (1ULL << i)))
425  {
426  snd_cnt++;
427  if (0 == (abits & (1ULL << i)))
428  ack_cnt++;
429  }
430  }
431  if (0 == ack_cnt)
432  {
433  /* complete loss */
435  snd_cnt);
436  }
437  else if (snd_cnt > ack_cnt)
438  {
439  /* some loss, slow down proportionally */
440  fc->msg_delay.rel_value_us = ((fc->msg_delay.rel_value_us * ack_cnt) / snd_cnt);
441  }
442  else if (snd_cnt == ack_cnt)
443  {
444  fc->msg_delay.rel_value_us =
445  (ndelay.rel_value_us / fc->num_transmissions + 3 * fc->msg_delay.rel_value_us) / 5;
446  }
447  fc->num_transmissions = 0;
452  }
454  _("# fragment acknowledgements received"),
455  1,
456  GNUNET_NO);
457  if (abits != (fc->acks & abits))
458  {
459  /* ID collission or message reordering, count! This should be rare! */
461  _("# bits removed from fragmentation ACKs"), 1,
462  GNUNET_NO);
463  }
464  fc->acks = abits & fc->acks_mask;
465  if (0 != fc->acks)
466  {
467  /* more to transmit, do so right now (if tracker permits...) */
468  if (fc->task != NULL)
469  {
470  /* schedule next transmission now, no point in waiting... */
473  }
474  else
475  {
476  /* only case where there is no task should be if we're waiting
477  * for the right to transmit again (proc_busy set to YES) */
479  }
480  return GNUNET_NO;
481  }
482 
483  /* all done */
485  _("# fragmentation transmissions completed"),
486  1,
487  GNUNET_NO);
488  if (NULL != fc->task)
489  {
491  fc->task = NULL;
492  }
493  return GNUNET_OK;
494 }
495 
496 
507 void
511 {
512  if (fc->task != NULL)
514  if (NULL != ack_delay)
515  *ack_delay = fc->ack_delay;
516  if (NULL != msg_delay)
518  fc->num_rounds);
519  GNUNET_free(fc);
520 }
521 
522 
523 /* end of fragmentation.c */
unsigned int num_rounds
How many rounds of transmission have we completed so far?
uint32_t fragment_id
Unique fragment ID.
Definition: fragmentation.h:72
uint64_t acks
Bitfield, set to 1 for each unacknowledged fragment.
Definition: fragmentation.c:89
struct GNUNET_MessageHeader header
Message header.
Definition: fragmentation.h:40
int GNUNET_BANDWIDTH_tracker_consume(struct GNUNET_BANDWIDTH_Tracker *av, ssize_t size)
Notify the tracker that a certain number of bytes of bandwidth have been consumed.
Definition: bandwidth.c:402
uint32_t fragment_id
Unique fragment ID.
Definition: fragmentation.h:45
struct GNUNET_SCHEDULER_Task * task
Task performing work for the fragmenter.
uint64_t rel_value_us
The actual value.
struct GNUNET_TIME_Absolute last_round
Time we transmitted the last message of the last round.
Definition: fragmentation.c:69
int GNUNET_snprintf(char *buf, size_t size, const char *format,...)
Like snprintf, just aborts if the buffer is of insufficient size.
uint64_t bits
Bits that are being acknowledged, in big-endian.
Definition: fragmentation.h:79
struct GNUNET_TIME_Relative GNUNET_TIME_relative_max(struct GNUNET_TIME_Relative t1, struct GNUNET_TIME_Relative t2)
Return the maximum of two relative time values.
Definition: time.c:287
struct GNUNET_STATISTICS_Handle * stats
Statistics to use.
Definition: fragmentation.c:44
struct GNUNET_TIME_Absolute GNUNET_TIME_relative_to_absolute(struct GNUNET_TIME_Relative rel)
Convert relative time to an absolute time in the future.
Definition: time.c:246
uint32_t GNUNET_CRYPTO_random_u32(enum GNUNET_CRYPTO_Quality mode, uint32_t i)
Produce a random value.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_TIME_UNIT_SECONDS
One second.
library to help fragment messages
struct GNUNET_TIME_Relative msg_delay
Current expected delay between messages.
Definition: fragmentation.c:59
void(* GNUNET_FRAGMENT_MessageProcessor)(void *cls, const struct GNUNET_MessageHeader *msg)
Function that is called with messages created by the fragmentation module.
GNUNET_FRAGMENT_MessageProcessor proc
Function to call for transmissions.
Definition: fragmentation.c:79
struct GNUNET_FRAGMENT_Context * GNUNET_FRAGMENT_context_create(struct GNUNET_STATISTICS_Handle *stats, uint16_t mtu, struct GNUNET_BANDWIDTH_Tracker *tracker, struct GNUNET_TIME_Relative msg_delay, struct GNUNET_TIME_Relative ack_delay, const struct GNUNET_MessageHeader *msg, GNUNET_FRAGMENT_MessageProcessor proc, void *proc_cls)
Create a fragmentation context for the given message.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
uint64_t acks_mask
Bitfield with all possible bits for acks (used to mask the ack we get back).
Definition: fragmentation.c:95
Struct to track available bandwidth.
uint16_t total_size
Total message size of the original message.
Definition: fragmentation.h:50
#define GNUNET_NO
Definition: gnunet_common.h:78
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:75
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
Handle for the service.
uint64_t abs_value_us
The actual value.
int8_t proc_busy
GNUNET_YES if we called proc and are now waiting for GNUNET_FRAGMENT_context_transmission_done() ...
const char * GNUNET_FRAGMENT_print_ack(const struct GNUNET_MessageHeader *ack)
Convert an ACK message to a printable format suitable for logging.
struct GNUNET_BANDWIDTH_Tracker * tracker
Tracker for flow control.
Definition: fragmentation.c:49
void * proc_cls
Closure for proc.
Definition: fragmentation.c:84
void GNUNET_FRAGMENT_context_transmission_done(struct GNUNET_FRAGMENT_Context *fc)
Continuation to call from the &#39;proc&#39; function after the fragment has been transmitted (and hence the ...
#define _(String)
GNU gettext support macro.
Definition: platform.h:181
uint16_t mtu
Target fragment size.
uint32_t fragment_id
Our fragmentation ID.
Fragmentation context.
Definition: fragmentation.c:40
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1237
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
unsigned int num_transmissions
How many transmission have we completed in this round?
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_now(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run as soon as possible.
Definition: scheduler.c:1264
const char * GNUNET_STRINGS_relative_time_to_string(struct GNUNET_TIME_Relative delta, int do_round)
Give relative time in human-readable fancy format.
Definition: strings.c:686
static void transmit_next(void *cls)
Transmit the next fragment to the other peer.
static char buf[2048]
#define GNUNET_TIME_UNIT_FOREVER_REL
Constant used to specify "forever".
static int fh
Handle to the unique file.
const struct GNUNET_MessageHeader * msg
Message to fragment (allocated at the end of this struct).
Definition: fragmentation.c:74
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_get(void)
Get the current time.
Definition: time.c:118
int GNUNET_FRAGMENT_process_ack(struct GNUNET_FRAGMENT_Context *fc, const struct GNUNET_MessageHeader *msg)
Process an acknowledgement message we got from the other side (to control re-transmits).
struct GNUNET_TIME_Relative GNUNET_BANDWIDTH_tracker_get_delay(struct GNUNET_BANDWIDTH_Tracker *av, size_t size)
Compute how long we should wait until consuming size bytes of bandwidth in order to stay within the g...
Definition: bandwidth.c:458
struct GNUNET_TIME_Relative GNUNET_TIME_relative_min(struct GNUNET_TIME_Relative t1, struct GNUNET_TIME_Relative t2)
Return the minimum of two relative time values.
Definition: time.c:272
#define GNUNET_SYSERR
Definition: gnunet_common.h:76
static unsigned int size
Size of the "table".
Definition: peer.c:66
uint16_t offset
Absolute offset (in bytes) of this fragment in the original message.
Definition: fragmentation.h:56
#define GNUNET_TIME_UNIT_ZERO
Relative time zero.
int8_t wack
GNUNET_YES if we are waiting for an ACK.
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_duration(struct GNUNET_TIME_Absolute whence)
Get the duration of an operation as the difference of the current time and the given start time "henc...
Definition: time.c:373
#define GNUNET_log(kind,...)
Message fragment acknowledgement.
Definition: fragmentation.h:63
Entry in list of pending tasks.
Definition: scheduler.c:131
Header for all communications.
Time for absolute times used by GNUnet, in microseconds.
struct GNUNET_TIME_Relative ack_delay
Current expected delay for ACKs.
Definition: fragmentation.c:54
#define GNUNET_YES
Definition: gnunet_common.h:77
#define MIN_ACK_DELAY
Absolute minimum delay we impose between sending and expecting ACK to arrive.
Definition: fragmentation.c:34
unsigned int next_transmission
Round-robin selector for the next transmission.
#define GNUNET_MESSAGE_TYPE_FRAGMENT
FRAGMENT of a larger message.
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_at(struct GNUNET_TIME_Absolute at, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run at the specified time.
Definition: scheduler.c:1214
void GNUNET_FRAGMENT_context_destroy(struct GNUNET_FRAGMENT_Context *fc, struct GNUNET_TIME_Relative *msg_delay, struct GNUNET_TIME_Relative *ack_delay)
Destroy the given fragmentation context (stop calling &#39;proc&#39;, free resources).
struct GNUNET_TIME_Relative GNUNET_TIME_relative_saturating_multiply(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Saturating multiply relative time by a given factor.
Definition: time.c:499
struct GNUNET_TIME_Absolute delay_until
Next allowed transmission time.
Definition: fragmentation.c:64
No good quality of the operation is needed (i.e., random numbers can be pseudo-random).
#define GNUNET_malloc(size)
Wrapper around malloc.
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:48
#define GNUNET_free(ptr)
Wrapper around free.
Time for relative time used by GNUnet, in microseconds.
Header for a message fragment.
Definition: fragmentation.h:36
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:956