GNUnet  0.11.x
defragmentation.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2009, 2011 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
25 #include "platform.h"
27 #include "fragmentation.h"
28 
32 struct FragTimes
33 {
38 
42  unsigned int bit;
43 };
44 
45 
53 {
58 
63 
68 
73  const struct GNUNET_MessageHeader *msg;
74 
80  struct GNUNET_TIME_Absolute last_update;
81 
87 
92  struct FragTimes frag_times[64];
93 
98  uint64_t bits;
99 
103  uint32_t fragment_id;
104 
108  unsigned int last_bit;
109 
115 
121 
125  uint16_t total_size;
126 
130  int16_t last_duplicate;
131 };
132 
133 
138 {
143 
148 
153 
157  void *cls;
158 
163 
168 
173  struct GNUNET_TIME_Relative latency;
174 
179  unsigned int num_msgs;
180 
185  unsigned int list_size;
186 
190  uint16_t mtu;
191 };
192 
193 
209  uint16_t mtu, unsigned int num_msgs,
210  void *cls,
213 {
215 
217  dc->stats = stats;
218  dc->cls = cls;
219  dc->proc = proc;
220  dc->ackp = ackp;
221  dc->num_msgs = num_msgs;
222  dc->mtu = mtu;
223  dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */
224  return dc;
225 }
226 
227 
233 void
235 {
236  struct MessageContext *mc;
237 
238  while (NULL != (mc = dc->head))
239  {
240  GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, mc);
241  dc->list_size--;
242  if (NULL != mc->ack_task)
243  {
245  mc->ack_task = NULL;
246  }
247  GNUNET_free (mc);
248  }
249  GNUNET_assert (0 == dc->list_size);
250  GNUNET_free (dc);
251 }
252 
253 
259 static void
260 send_ack (void *cls)
261 {
262  struct MessageContext *mc = cls;
263  struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
264  struct FragmentAcknowledgement fa;
265 
266  mc->ack_task = NULL;
267  fa.header.size = htons (sizeof(struct FragmentAcknowledgement));
269  fa.fragment_id = htonl (mc->fragment_id);
270  fa.bits = GNUNET_htonll (mc->bits);
272  _ ("# acknowledgements sent for fragment"),
273  1,
274  GNUNET_NO);
275  mc->last_duplicate = GNUNET_NO; /* clear flag */
276  dc->ackp (dc->cls,
277  mc->fragment_id,
278  &fa.header);
279 }
280 
281 
286 static void
287 gsl_fit_mul (const double *x, const size_t xstride, const double *y,
288  const size_t ystride, const size_t n, double *c1, double *cov_11,
289  double *sumsq)
290 {
291  double m_x = 0, m_y = 0, m_dx2 = 0, m_dxdy = 0;
292 
293  size_t i;
294 
295  for (i = 0; i < n; i++)
296  {
297  m_x += (x[i * xstride] - m_x) / (i + 1.0);
298  m_y += (y[i * ystride] - m_y) / (i + 1.0);
299  }
300 
301  for (i = 0; i < n; i++)
302  {
303  const double dx = x[i * xstride] - m_x;
304  const double dy = y[i * ystride] - m_y;
305 
306  m_dx2 += (dx * dx - m_dx2) / (i + 1.0);
307  m_dxdy += (dx * dy - m_dxdy) / (i + 1.0);
308  }
309 
310  /* In terms of y = b x */
311 
312  {
313  double s2 = 0, d2 = 0;
314  double b = (m_x * m_y + m_dxdy) / (m_x * m_x + m_dx2);
315 
316  *c1 = b;
317 
318  /* Compute chi^2 = \sum (y_i - b * x_i)^2 */
319 
320  for (i = 0; i < n; i++)
321  {
322  const double dx = x[i * xstride] - m_x;
323  const double dy = y[i * ystride] - m_y;
324  const double d = (m_y - b * m_x) + dy - b * dx;
325 
326  d2 += d * d;
327  }
328 
329  s2 = d2 / (n - 1.0); /* chisq per degree of freedom */
330 
331  *cov_11 = s2 * 1.0 / (n * (m_x * m_x + m_dx2));
332 
333  *sumsq = d2;
334  }
335 }
336 
337 
345 static struct GNUNET_TIME_Relative
347 {
348  struct FragTimes *first;
349  size_t total = mc->frag_times_write_offset - mc->frag_times_start_offset;
350  double x[total];
351  double y[total];
352  size_t i;
353  double c1;
354  double cov11;
355  double sumsq;
356  struct GNUNET_TIME_Relative ret;
357 
358  first = &mc->frag_times[mc->frag_times_start_offset];
359  GNUNET_assert (total > 1);
360  for (i = 0; i < total; i++)
361  {
362  x[i] = (double) i;
363  y[i] = (double) (first[i].time.abs_value_us - first[0].time.abs_value_us);
364  }
365  gsl_fit_mul (x, 1, y, 1, total, &c1, &cov11, &sumsq);
366  c1 += sqrt (sumsq); /* add 1 std dev */
367  ret.rel_value_us = (uint64_t) c1;
368  if (0 == ret.rel_value_us)
369  ret = GNUNET_TIME_UNIT_MICROSECONDS; /* always at least 1 */
370  return ret;
371 }
372 
373 
379 static void
381 {
382  struct MessageContext *old;
383  struct MessageContext *pos;
384 
385  old = NULL;
386  pos = dc->head;
387  while (NULL != pos)
388  {
389  if ((old == NULL) ||
391  old = pos;
392  pos = pos->next;
393  }
394  GNUNET_assert (NULL != old);
395  GNUNET_CONTAINER_DLL_remove (dc->head, dc->tail, old);
396  dc->list_size--;
397  if (NULL != old->ack_task)
398  {
400  old->ack_task = NULL;
401  }
402  GNUNET_free (old);
403 }
404 
405 
415 int
417  const struct GNUNET_MessageHeader *msg)
418 {
419  struct MessageContext *mc;
420  const struct FragmentHeader *fh;
421  uint16_t msize;
422  uint16_t foff;
423  uint32_t fid;
424  char *mbuf;
425  unsigned int bit;
426  struct GNUNET_TIME_Absolute now;
427  struct GNUNET_TIME_Relative delay;
428  unsigned int bc;
429  unsigned int b;
430  unsigned int n;
431  unsigned int num_fragments;
432  int duplicate;
433  int last;
434 
435  if (ntohs (msg->size) < sizeof(struct FragmentHeader))
436  {
437  GNUNET_break_op (0);
438  return GNUNET_SYSERR;
439  }
440  if (ntohs (msg->size) > dc->mtu)
441  {
442  GNUNET_break_op (0);
443  return GNUNET_SYSERR;
444  }
445  fh = (const struct FragmentHeader *) msg;
446  msize = ntohs (fh->total_size);
447  if (msize < sizeof(struct GNUNET_MessageHeader))
448  {
449  GNUNET_break_op (0);
450  return GNUNET_SYSERR;
451  }
452  fid = ntohl (fh->fragment_id);
453  foff = ntohs (fh->offset);
454  if (foff >= msize)
455  {
456  GNUNET_break_op (0);
457  return GNUNET_SYSERR;
458  }
459  if (0 != (foff % (dc->mtu - sizeof(struct FragmentHeader))))
460  {
461  GNUNET_break_op (0);
462  return GNUNET_SYSERR;
463  }
465  _ ("# fragments received"),
466  1,
467  GNUNET_NO);
468  num_fragments = (ntohs (msg->size) + dc->mtu - sizeof(struct FragmentHeader)
469  - 1) / (dc->mtu - sizeof(struct FragmentHeader));
470  last = 0;
471  for (mc = dc->head; NULL != mc; mc = mc->next)
472  if (mc->fragment_id > fid)
473  last++;
474 
475  mc = dc->head;
476  while ((NULL != mc) && (fid != mc->fragment_id))
477  mc = mc->next;
478  bit = foff / (dc->mtu - sizeof(struct FragmentHeader));
479  if (bit * (dc->mtu - sizeof(struct FragmentHeader)) + ntohs (msg->size)
480  - sizeof(struct FragmentHeader) > msize)
481  {
482  /* payload extends past total message size */
483  GNUNET_break_op (0);
484  return GNUNET_SYSERR;
485  }
486  if ((NULL != mc) && (msize != mc->total_size))
487  {
488  /* inconsistent message size */
489  GNUNET_break_op (0);
490  return GNUNET_SYSERR;
491  }
492  now = GNUNET_TIME_absolute_get ();
493  if (NULL == mc)
494  {
495  mc = GNUNET_malloc (sizeof(struct MessageContext) + msize);
496  mc->msg = (const struct GNUNET_MessageHeader *) &mc[1];
497  mc->dc = dc;
498  mc->total_size = msize;
499  mc->fragment_id = fid;
500  mc->last_update = now;
501  n = (msize + dc->mtu - sizeof(struct FragmentHeader) - 1) / (dc->mtu
502  - sizeof(struct
503  FragmentHeader));
504  if (n == 64)
505  mc->bits = UINT64_MAX; /* set all 64 bit */
506  else
507  mc->bits = (1LLU << n) - 1; /* set lowest 'bits' bit */
508  if (dc->list_size >= dc->num_msgs)
509  discard_oldest_mc (dc);
510  GNUNET_CONTAINER_DLL_insert (dc->head,
511  dc->tail,
512  mc);
513  dc->list_size++;
514  }
515 
516  /* copy data to 'mc' */
517  if (0 != (mc->bits & (1LLU << bit)))
518  {
519  mc->bits -= 1LLU << bit;
520  mbuf = (char *) &mc[1];
521  GNUNET_memcpy (&mbuf[bit * (dc->mtu - sizeof(struct FragmentHeader))],
522  &fh[1],
523  ntohs (msg->size) - sizeof(struct FragmentHeader));
524  mc->last_update = now;
525  if (bit < mc->last_bit)
527  mc->last_bit = bit;
528  mc->frag_times[mc->frag_times_write_offset].time = now;
531  duplicate = GNUNET_NO;
532  }
533  else
534  {
535  duplicate = GNUNET_YES;
536  GNUNET_STATISTICS_update (dc->stats,
537  _ ("# duplicate fragments received"),
538  1,
539  GNUNET_NO);
540  }
541 
542  /* count number of missing fragments after the current one */
543  bc = 0;
544  for (b = bit; b < 64; b++)
545  if (0 != (mc->bits & (1LLU << b)))
546  bc++;
547  else
548  bc = 0;
549 
550  /* notify about complete message */
551  if ((GNUNET_NO == duplicate) &&
552  (0 == mc->bits))
553  {
554  GNUNET_STATISTICS_update (dc->stats,
555  _ ("# messages defragmented"),
556  1,
557  GNUNET_NO);
558  /* message complete, notify! */
559  dc->proc (dc->cls, mc->msg);
560  }
561  /* send ACK */
563  {
564  dc->latency = estimate_latency (mc);
565  }
566  delay = GNUNET_TIME_relative_saturating_multiply (dc->latency,
567  bc + 1);
568  if ((last + fid == num_fragments) ||
569  (0 == mc->bits) ||
570  (GNUNET_YES == duplicate))
571  {
572  /* message complete or duplicate or last missing fragment in
573  linear sequence; ACK now! */
574  delay = GNUNET_TIME_UNIT_ZERO;
575  }
576  if (NULL != mc->ack_task)
579  &send_ack,
580  mc);
581  if (GNUNET_YES == duplicate)
582  {
584  return GNUNET_NO;
585  }
586  return GNUNET_YES;
587 }
588 
589 
590 /* end of defragmentation.c */
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
uint32_t fragment_id
Unique fragment ID.
Definition: fragmentation.h:74
struct MessageContext * tail
Tail of list of messages we&#39;re defragmenting.
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
uint32_t fragment_id
Unique fragment ID.
Definition: fragmentation.h:46
uint64_t rel_value_us
The actual value.
#define GNUNET_CONTAINER_DLL_insert(head, tail, element)
Insert an element at the head of a DLL.
unsigned int frag_times_start_offset
For the current ACK round, which is the first relevant offset in frag_times?
uint64_t bits
Bits that are being acknowledged, in big-endian.
Definition: fragmentation.h:81
struct GNUNET_SCHEDULER_Task * ack_task
Task scheduled for transmitting the next ACK to the other peer.
static void send_ack(void *cls)
Send acknowledgement to the other peer now.
struct GNUNET_DEFRAGMENT_Context * GNUNET_DEFRAGMENT_context_create(struct GNUNET_STATISTICS_Handle *stats, uint16_t mtu, unsigned int num_msgs, void *cls, GNUNET_FRAGMENT_MessageProcessor proc, GNUNET_DEFRAGMENT_AckProcessor ackp)
Create a defragmentation context.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_TIME_UNIT_SECONDS
One second.
static void gsl_fit_mul(const double *x, const size_t xstride, const double *y, const size_t ystride, const size_t n, double *c1, double *cov_11, double *sumsq)
This function is from the GNU Scientific Library, linear/fit.c, Copyright (C) 2000 Brian Gough...
library to help fragment messages
void(* GNUNET_FRAGMENT_MessageProcessor)(void *cls, const struct GNUNET_MessageHeader *msg)
Function that is called with messages created by the fragmentation module.
uint64_t bits
Which fragments have we gotten yet? bits that are 1 indicate missing fragments.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
static int ret
Return value of the commandline.
Definition: gnunet-abd.c:81
uint16_t total_size
Total message size of the original message.
Definition: fragmentation.h:51
#define GNUNET_NO
Definition: gnunet_common.h:78
#define GNUNET_new(type)
Allocate a struct or union of the given type.
Timestamps for fragments.
void * cls
Closure for proc and ackp.
struct GNUNET_MessageHeader header
Message header.
Definition: fragmentation.h:69
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
Handle for the service.
struct GNUNET_TIME_Absolute last_update
Last time we received any update for this message (least-recently updated message will be discarded i...
#define GNUNET_MESSAGE_TYPE_FRAGMENT_ACK
Acknowledgement of a FRAGMENT of a larger message.
uint64_t abs_value_us
The actual value.
uint16_t total_size
Total size of the message that we are assembling.
unsigned int list_size
Current number of messages in the &#39;struct MessageContext&#39; DLL (smaller or equal to &#39;num_msgs&#39;)...
struct GNUNET_TIME_Absolute time
The time the fragment was received.
GNUNET_DEFRAGMENT_AckProcessor ackp
Function to call with acknowledgements.
#define _(String)
GNU gettext support macro.
Definition: platform.h:181
unsigned int num_msgs
num_msgs how many fragmented messages to we defragment at most at the same time?
struct MessageContext * head
Head of list of messages we&#39;re defragmenting.
struct MessageContext * prev
This is a DLL.
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1253
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
Information we keep for one message that is being assembled.
void GNUNET_DEFRAGMENT_context_destroy(struct GNUNET_DEFRAGMENT_Context *dc)
Destroy the given defragmentation context.
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
struct FragTimes frag_times[64]
When did we receive which fragment? Used to calculate the time we should send the ACK...
struct GNUNET_STATISTICS_Handle * stats
Handle to the statistics service.
unsigned int last_bit
Which &#39;bit&#39; did the last fragment we received correspond to?
static int fh
Handle to the unique file.
uint32_t fragment_id
Unique ID for this message.
static struct GNUNET_TESTBED_Controller * mc
Handle to the master controller.
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_get(void)
Get the current time.
Definition: time.c:118
int16_t last_duplicate
Was the last fragment we got a duplicate?
struct GNUNET_DEFRAGMENT_Context * dc
Associated defragmentation context.
Defragmentation context (one per connection).
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:35
struct GNUNET_STATISTICS_Handle * stats
For statistics.
#define GNUNET_SYSERR
Definition: gnunet_common.h:76
uint16_t offset
Absolute offset (in bytes) of this fragment in the original message.
Definition: fragmentation.h:57
#define GNUNET_TIME_UNIT_ZERO
Relative time zero.
struct GNUNET_TIME_Relative latency
Running average of the latency (delay between messages) for this connection.
static void discard_oldest_mc(struct GNUNET_DEFRAGMENT_Context *dc)
Discard the message context that was inactive for the longest time.
struct MessageContext * next
This is a DLL.
GNUNET_FRAGMENT_MessageProcessor proc
Function to call with defragmented messages.
unsigned int bit
Number of the bit for the fragment (in [0,..,63]).
Message fragment acknowledgement.
Definition: fragmentation.h:64
Entry in list of pending tasks.
Definition: scheduler.c:134
const struct GNUNET_MessageHeader * msg
Pointer to the assembled message, allocated at the end of this struct.
static struct GNUNET_TIME_Relative estimate_latency(struct MessageContext *mc)
Estimate the latency between messages based on the most recent message time stamps.
Header for all communications.
int GNUNET_DEFRAGMENT_process_fragment(struct GNUNET_DEFRAGMENT_Context *dc, const struct GNUNET_MessageHeader *msg)
We have received a fragment.
Time for absolute times used by GNUnet, in microseconds.
#define GNUNET_YES
Definition: gnunet_common.h:77
uint16_t mtu
Maximum message size for each fragment.
unsigned int frag_times_write_offset
Which offset whould we write the next frag value into in the frag_times array? All smaller entries ar...
#define GNUNET_TIME_UNIT_MICROSECONDS
One microsecond, our basic time unit.
struct GNUNET_TIME_Relative GNUNET_TIME_relative_saturating_multiply(struct GNUNET_TIME_Relative rel, unsigned long long factor)
Saturating multiply relative time by a given factor.
Definition: time.c:501
void(* GNUNET_DEFRAGMENT_AckProcessor)(void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
Function that is called with acknowledgement messages created by the fragmentation module...
#define GNUNET_malloc(size)
Wrapper around malloc.
static struct GNUNET_FS_DownloadContext * dc
#define GNUNET_free(ptr)
Wrapper around free.
Time for relative time used by GNUnet, in microseconds.
Header for a message fragment.
Definition: fragmentation.h:36
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:966