GNUnet  0.10.x
mst.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010, 2016, 2017 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19 */
20 
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 
30 
31 #if HAVE_UNALIGNED_64_ACCESS
32 #define ALIGN_FACTOR 4
33 #else
34 #define ALIGN_FACTOR 8
35 #endif
36 
37 #define LOG(kind,...) GNUNET_log_from (kind, "util-mst", __VA_ARGS__)
38 
39 
44 {
45 
50 
54  void *cb_cls;
55 
59  size_t curr_buf;
60 
64  size_t off;
65 
69  size_t pos;
70 
75 
76 };
77 
78 
88  void *cb_cls)
89 {
91 
95  ret->cb = cb;
96  ret->cb_cls = cb_cls;
97  return ret;
98 }
99 
100 
115 int
117  const char *buf,
118  size_t size,
119  int purge,
120  int one_shot)
121 {
122  const struct GNUNET_MessageHeader *hdr;
123  size_t delta;
124  uint16_t want;
125  char *ibuf;
126  int need_align;
127  unsigned long offset;
128  int ret;
129  int cbret;
130 
131  GNUNET_assert (mst->off <= mst->pos);
132  GNUNET_assert (mst->pos <= mst->curr_buf);
134  "MST receives %u bytes with %u bytes already in private buffer\n",
135  (unsigned int) size,
136  (unsigned int) (mst->pos - mst->off));
137  ret = GNUNET_OK;
138  ibuf = (char *) mst->hdr;
139  while (mst->pos > 0)
140  {
141 do_align:
142  GNUNET_assert (mst->pos >= mst->off);
143  if ((mst->curr_buf - mst->off < sizeof (struct GNUNET_MessageHeader)) ||
144  (0 != (mst->off % ALIGN_FACTOR)))
145  {
146  /* need to align or need more space */
147  mst->pos -= mst->off;
148  memmove (ibuf,
149  &ibuf[mst->off],
150  mst->pos);
151  mst->off = 0;
152  }
153  if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
154  {
155  delta
156  = GNUNET_MIN (sizeof (struct GNUNET_MessageHeader)
157  - (mst->pos - mst->off),
158  size);
159  GNUNET_memcpy (&ibuf[mst->pos],
160  buf,
161  delta);
162  mst->pos += delta;
163  buf += delta;
164  size -= delta;
165  }
166  if (mst->pos - mst->off < sizeof (struct GNUNET_MessageHeader))
167  {
168  if (purge)
169  {
170  mst->off = 0;
171  mst->pos = 0;
172  }
173  return GNUNET_OK;
174  }
175  hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
176  want = ntohs (hdr->size);
177  if (want < sizeof (struct GNUNET_MessageHeader))
178  {
179  GNUNET_break_op (0);
180  return GNUNET_SYSERR;
181  }
182  if ( (mst->curr_buf - mst->off < want) &&
183  (mst->off > 0) )
184  {
185  /* can get more space by moving */
186  mst->pos -= mst->off;
187  memmove (ibuf,
188  &ibuf[mst->off],
189  mst->pos);
190  mst->off = 0;
191  }
192  if (mst->curr_buf < want)
193  {
194  /* need to get more space by growing buffer */
195  GNUNET_assert (0 == mst->off);
196  mst->hdr = GNUNET_realloc (mst->hdr,
197  want);
198  ibuf = (char *) mst->hdr;
199  mst->curr_buf = want;
200  }
201  hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
202  if (mst->pos - mst->off < want)
203  {
204  delta = GNUNET_MIN (want - (mst->pos - mst->off),
205  size);
206  GNUNET_assert (mst->pos + delta <= mst->curr_buf);
207  GNUNET_memcpy (&ibuf[mst->pos],
208  buf,
209  delta);
210  mst->pos += delta;
211  buf += delta;
212  size -= delta;
213  }
214  if (mst->pos - mst->off < want)
215  {
216  if (purge)
217  {
218  mst->off = 0;
219  mst->pos = 0;
220  }
221  return GNUNET_OK;
222  }
223  if (one_shot == GNUNET_SYSERR)
224  {
225  /* cannot call callback again, but return value saying that
226  * we have another full message in the buffer */
227  ret = GNUNET_NO;
228  goto copy;
229  }
230  if (one_shot == GNUNET_YES)
231  one_shot = GNUNET_SYSERR;
232  mst->off += want;
233  if (GNUNET_OK !=
234  (cbret = mst->cb (mst->cb_cls,
235  hdr)))
236  {
237  if (GNUNET_SYSERR == cbret)
239  "Failure processing message of type %u and size %u\n",
240  ntohs (hdr->type),
241  ntohs (hdr->size));
242  return GNUNET_SYSERR;
243  }
244  if (mst->off == mst->pos)
245  {
246  /* reset to beginning of buffer, it's free right now! */
247  mst->off = 0;
248  mst->pos = 0;
249  }
250  }
251  GNUNET_assert (0 == mst->pos);
252  while (size > 0)
253  {
255  "Server-mst has %u bytes left in inbound buffer\n",
256  (unsigned int) size);
257  if (size < sizeof (struct GNUNET_MessageHeader))
258  break;
259  offset = (unsigned long) buf;
260  need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO;
261  if (GNUNET_NO == need_align)
262  {
263  /* can try to do zero-copy and process directly from original buffer */
264  hdr = (const struct GNUNET_MessageHeader *) buf;
265  want = ntohs (hdr->size);
266  if (want < sizeof (struct GNUNET_MessageHeader))
267  {
268  GNUNET_break_op (0);
269  mst->off = 0;
270  return GNUNET_SYSERR;
271  }
272  if (size < want)
273  break; /* or not: buffer incomplete, so copy to private buffer... */
274  if (one_shot == GNUNET_SYSERR)
275  {
276  /* cannot call callback again, but return value saying that
277  * we have another full message in the buffer */
278  ret = GNUNET_NO;
279  goto copy;
280  }
281  if (one_shot == GNUNET_YES)
282  one_shot = GNUNET_SYSERR;
283  if (GNUNET_OK !=
284  (cbret = mst->cb (mst->cb_cls,
285  hdr)))
286  {
287  if (GNUNET_SYSERR == cbret)
289  "Failure processing message of type %u and size %u\n",
290  ntohs (hdr->type),
291  ntohs (hdr->size));
292  return GNUNET_SYSERR;
293  }
294  buf += want;
295  size -= want;
296  }
297  else
298  {
299  /* need to copy to private buffer to align;
300  * yes, we go a bit more spagetti than usual here */
301  goto do_align;
302  }
303  }
304 copy:
305  if ((size > 0) && (!purge))
306  {
307  if (size + mst->pos > mst->curr_buf)
308  {
309  mst->hdr = GNUNET_realloc (mst->hdr,
310  size + mst->pos);
311  ibuf = (char *) mst->hdr;
312  mst->curr_buf = size + mst->pos;
313  }
314  GNUNET_assert (size + mst->pos <= mst->curr_buf);
315  GNUNET_memcpy (&ibuf[mst->pos],
316  buf,
317  size);
318  mst->pos += size;
319  }
320  if (purge)
321  {
322  mst->off = 0;
323  mst->pos = 0;
324  }
326  "Server-mst leaves %u bytes in private buffer\n",
327  (unsigned int) (mst->pos - mst->off));
328  return ret;
329 }
330 
331 
346 int
348  struct GNUNET_NETWORK_Handle *sock,
349  int purge,
350  int one_shot)
351 {
352  ssize_t ret;
353  size_t left;
354  char *buf;
355 
356  left = mst->curr_buf - mst->pos;
357  buf = (char *) mst->hdr;
358  ret = GNUNET_NETWORK_socket_recv (sock,
359  &buf[mst->pos],
360  left);
361  if (-1 == ret)
362  {
363  if ( (EAGAIN == errno) ||
364  (EINTR == errno) )
365  return GNUNET_OK;
367  "recv");
368  return GNUNET_SYSERR;
369  }
370  if (0 == ret)
371  {
372  /* other side closed connection, treat as error */
373  return GNUNET_SYSERR;
374  }
375  mst->pos += ret;
376  return GNUNET_MST_from_buffer (mst,
377  NULL,
378  0,
379  purge,
380  one_shot);
381 }
382 
383 
395 int
397  int one_shot)
398 {
399  return GNUNET_MST_from_buffer (mst,
400  NULL,
401  0,
402  GNUNET_NO,
403  one_shot);
404 }
405 
406 
412 void
414 {
415  GNUNET_free (mst->hdr);
416  GNUNET_free (mst);
417 }
418 
419 
420 
421 /* end of server_mst.c */
size_t off
How many bytes in buffer have we already processed?
Definition: mst.c:64
static struct GNUNET_TIME_Relative delta
Definition: speedup.c:35
void * cb_cls
Closure for cb.
Definition: mst.c:54
static size_t do_align(size_t start_position, size_t end_position)
Given the start and end position of a block of data, return the end position of that data after align...
Definition: fs_directory.c:484
ssize_t GNUNET_NETWORK_socket_recv(const struct GNUNET_NETWORK_Handle *desc, void *buffer, size_t length)
Read data from a connected socket (always non-blocking).
Definition: network.c:894
#define ALIGN_FACTOR
Definition: mst.c:34
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
int GNUNET_MST_next(struct GNUNET_MessageStreamTokenizer *mst, int one_shot)
Obtain the next message from the mst, assuming that there are more unprocessed messages in the intern...
Definition: mst.c:396
#define GNUNET_NO
Definition: gnunet_common.h:81
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:78
#define GNUNET_new(type)
Allocate a struct or union of the given type.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
static int ret
Final status code.
Definition: gnunet-arm.c:89
int(* GNUNET_MessageTokenizerCallback)(void *cls, const struct GNUNET_MessageHeader *message)
Functions with this signature are called whenever a complete message is received by the tokenizer...
#define GNUNET_log_strerror(level, cmd)
Log an error message at log-level &#39;level&#39; that indicates a failure of the command &#39;cmd&#39; with the mess...
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
#define GNUNET_memcpy(dst, src, n)
void GNUNET_MST_destroy(struct GNUNET_MessageStreamTokenizer *mst)
Destroys a tokenizer.
Definition: mst.c:413
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
#define GNUNET_realloc(ptr, size)
Wrapper around realloc.
Handle to a message stream tokenizer.
Definition: mst.c:43
#define GNUNET_MIN(a, b)
Definition: gnunet_common.h:83
static char buf[2048]
#define GNUNET_MIN_MESSAGE_SIZE
Smallest supported message.
struct GNUNET_MessageStreamTokenizer * GNUNET_MST_create(GNUNET_MessageTokenizerCallback cb, void *cb_cls)
Create a message stream tokenizer.
Definition: mst.c:87
int GNUNET_MST_from_buffer(struct GNUNET_MessageStreamTokenizer *mst, const char *buf, size_t size, int purge, int one_shot)
Add incoming data to the receive buffer and call the callback for all complete messages.
Definition: mst.c:116
#define GNUNET_SYSERR
Definition: gnunet_common.h:79
static unsigned int size
Size of the "table".
Definition: peer.c:67
struct GNUNET_MessageHeader * hdr
Beginning of the buffer.
Definition: mst.c:74
size_t pos
How many bytes in buffer are valid right now?
Definition: mst.c:69
#define GNUNET_log(kind,...)
int GNUNET_MST_read(struct GNUNET_MessageStreamTokenizer *mst, struct GNUNET_NETWORK_Handle *sock, int purge, int one_shot)
Add incoming data to the receive buffer and call the callback for all complete messages.
Definition: mst.c:347
#define LOG(kind,...)
Definition: mst.c:37
handle to a socket
Definition: network.c:46
size_t curr_buf
Size of the buffer (starting at hdr).
Definition: mst.c:59
Header for all communications.
#define GNUNET_YES
Definition: gnunet_common.h:80
GNUNET_MessageTokenizerCallback cb
Function to call on completed messages.
Definition: mst.c:49
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_free(ptr)
Wrapper around free.