GNUnet  0.11.x
mst.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010, 2016, 2017 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 
30 
31 #if HAVE_UNALIGNED_64_ACCESS
32 #define ALIGN_FACTOR 4
33 #else
34 #define ALIGN_FACTOR 8
35 #endif
36 
37 #define LOG(kind, ...) GNUNET_log_from (kind, "util-mst", __VA_ARGS__)
38 
39 
44 {
49 
53  void *cb_cls;
54 
58  size_t curr_buf;
59 
63  size_t off;
64 
68  size_t pos;
69 
74 };
75 
76 
86  void *cb_cls)
87 {
89 
93  ret->cb = cb;
94  ret->cb_cls = cb_cls;
95  return ret;
96 }
97 
98 
113 int
115  const char *buf,
116  size_t size,
117  int purge,
118  int one_shot)
119 {
120  const struct GNUNET_MessageHeader *hdr;
121  size_t delta;
122  uint16_t want;
123  char *ibuf;
124  int need_align;
125  unsigned long offset;
126  int ret;
127  int cbret;
128 
129  GNUNET_assert (mst->off <= mst->pos);
130  GNUNET_assert (mst->pos <= mst->curr_buf);
132  "MST receives %u bytes with %u bytes already in private buffer\n",
133  (unsigned int) size,
134  (unsigned int) (mst->pos - mst->off));
135  ret = GNUNET_OK;
136  ibuf = (char *) mst->hdr;
137  while (mst->pos > 0)
138  {
139 do_align:
140  GNUNET_assert (mst->pos >= mst->off);
141  if ((mst->curr_buf - mst->off < sizeof(struct GNUNET_MessageHeader)) ||
142  (0 != (mst->off % ALIGN_FACTOR)))
143  {
144  /* need to align or need more space */
145  mst->pos -= mst->off;
146  memmove (ibuf,
147  &ibuf[mst->off],
148  mst->pos);
149  mst->off = 0;
150  }
151  if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
152  {
153  delta
154  = GNUNET_MIN (sizeof(struct GNUNET_MessageHeader)
155  - (mst->pos - mst->off),
156  size);
157  GNUNET_memcpy (&ibuf[mst->pos],
158  buf,
159  delta);
160  mst->pos += delta;
161  buf += delta;
162  size -= delta;
163  }
164  if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
165  {
166  if (purge)
167  {
168  mst->off = 0;
169  mst->pos = 0;
170  }
171  return GNUNET_OK;
172  }
173  hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
174  want = ntohs (hdr->size);
175  if (want < sizeof(struct GNUNET_MessageHeader))
176  {
177  GNUNET_break_op (0);
178  return GNUNET_SYSERR;
179  }
180  if ((mst->curr_buf - mst->off < want) &&
181  (mst->off > 0))
182  {
183  /* can get more space by moving */
184  mst->pos -= mst->off;
185  memmove (ibuf,
186  &ibuf[mst->off],
187  mst->pos);
188  mst->off = 0;
189  }
190  if (mst->curr_buf < want)
191  {
192  /* need to get more space by growing buffer */
193  GNUNET_assert (0 == mst->off);
194  mst->hdr = GNUNET_realloc (mst->hdr,
195  want);
196  ibuf = (char *) mst->hdr;
197  mst->curr_buf = want;
198  }
199  hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
200  if (mst->pos - mst->off < want)
201  {
202  delta = GNUNET_MIN (want - (mst->pos - mst->off),
203  size);
204  GNUNET_assert (mst->pos + delta <= mst->curr_buf);
205  GNUNET_memcpy (&ibuf[mst->pos],
206  buf,
207  delta);
208  mst->pos += delta;
209  buf += delta;
210  size -= delta;
211  }
212  if (mst->pos - mst->off < want)
213  {
214  if (purge)
215  {
216  mst->off = 0;
217  mst->pos = 0;
218  }
219  return GNUNET_OK;
220  }
221  if (one_shot == GNUNET_SYSERR)
222  {
223  /* cannot call callback again, but return value saying that
224  * we have another full message in the buffer */
225  ret = GNUNET_NO;
226  goto copy;
227  }
228  if (one_shot == GNUNET_YES)
229  one_shot = GNUNET_SYSERR;
230  mst->off += want;
231  if (GNUNET_OK !=
232  (cbret = mst->cb (mst->cb_cls,
233  hdr)))
234  {
235  if (GNUNET_SYSERR == cbret)
237  "Failure processing message of type %u and size %u\n",
238  ntohs (hdr->type),
239  ntohs (hdr->size));
240  return GNUNET_SYSERR;
241  }
242  if (mst->off == mst->pos)
243  {
244  /* reset to beginning of buffer, it's free right now! */
245  mst->off = 0;
246  mst->pos = 0;
247  }
248  }
249  GNUNET_assert (0 == mst->pos);
250  while (size > 0)
251  {
253  "Server-mst has %u bytes left in inbound buffer\n",
254  (unsigned int) size);
255  if (size < sizeof(struct GNUNET_MessageHeader))
256  break;
257  offset = (unsigned long) buf;
258  need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO;
259  if (GNUNET_NO == need_align)
260  {
261  /* can try to do zero-copy and process directly from original buffer */
262  hdr = (const struct GNUNET_MessageHeader *) buf;
263  want = ntohs (hdr->size);
264  if (want < sizeof(struct GNUNET_MessageHeader))
265  {
266  GNUNET_break_op (0);
267  mst->off = 0;
268  return GNUNET_SYSERR;
269  }
270  if (size < want)
271  break; /* or not: buffer incomplete, so copy to private buffer... */
272  if (one_shot == GNUNET_SYSERR)
273  {
274  /* cannot call callback again, but return value saying that
275  * we have another full message in the buffer */
276  ret = GNUNET_NO;
277  goto copy;
278  }
279  if (one_shot == GNUNET_YES)
280  one_shot = GNUNET_SYSERR;
281  if (GNUNET_OK !=
282  (cbret = mst->cb (mst->cb_cls,
283  hdr)))
284  {
285  if (GNUNET_SYSERR == cbret)
287  "Failure processing message of type %u and size %u\n",
288  ntohs (hdr->type),
289  ntohs (hdr->size));
290  return GNUNET_SYSERR;
291  }
292  buf += want;
293  size -= want;
294  }
295  else
296  {
297  /* need to copy to private buffer to align;
298  * yes, we go a bit more spagetti than usual here */
299  goto do_align;
300  }
301  }
302 copy:
303  if ((size > 0) && (! purge))
304  {
305  if (size + mst->pos > mst->curr_buf)
306  {
307  mst->hdr = GNUNET_realloc (mst->hdr,
308  size + mst->pos);
309  ibuf = (char *) mst->hdr;
310  mst->curr_buf = size + mst->pos;
311  }
312  GNUNET_assert (size + mst->pos <= mst->curr_buf);
313  GNUNET_memcpy (&ibuf[mst->pos],
314  buf,
315  size);
316  mst->pos += size;
317  }
318  if (purge)
319  {
320  mst->off = 0;
321  mst->pos = 0;
322  }
324  "Server-mst leaves %u bytes in private buffer\n",
325  (unsigned int) (mst->pos - mst->off));
326  return ret;
327 }
328 
329 
344 int
346  struct GNUNET_NETWORK_Handle *sock,
347  int purge,
348  int one_shot)
349 {
350  ssize_t ret;
351  size_t left;
352  char *buf;
353 
354  left = mst->curr_buf - mst->pos;
355  buf = (char *) mst->hdr;
356  ret = GNUNET_NETWORK_socket_recv (sock,
357  &buf[mst->pos],
358  left);
359  if (-1 == ret)
360  {
361  if ((EAGAIN == errno) ||
362  (EINTR == errno))
363  return GNUNET_OK;
365  "recv");
366  return GNUNET_SYSERR;
367  }
368  if (0 == ret)
369  {
370  /* other side closed connection, treat as error */
371  return GNUNET_SYSERR;
372  }
373  mst->pos += ret;
374  return GNUNET_MST_from_buffer (mst,
375  NULL,
376  0,
377  purge,
378  one_shot);
379 }
380 
381 
393 int
395  int one_shot)
396 {
397  return GNUNET_MST_from_buffer (mst,
398  NULL,
399  0,
400  GNUNET_NO,
401  one_shot);
402 }
403 
404 
410 void
412 {
413  GNUNET_free (mst->hdr);
414  GNUNET_free (mst);
415 }
416 
417 
418 /* end of server_mst.c */
size_t off
How many bytes in buffer have we already processed?
Definition: mst.c:63
static struct GNUNET_TIME_Relative delta
Definition: speedup.c:35
void * cb_cls
Closure for cb.
Definition: mst.c:53
static size_t do_align(size_t start_position, size_t end_position)
Given the start and end position of a block of data, return the end position of that data after align...
Definition: fs_directory.c:487
ssize_t GNUNET_NETWORK_socket_recv(const struct GNUNET_NETWORK_Handle *desc, void *buffer, size_t length)
Read data from a connected socket (always non-blocking).
Definition: network.c:787
#define ALIGN_FACTOR
Definition: mst.c:34
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
int GNUNET_MST_next(struct GNUNET_MessageStreamTokenizer *mst, int one_shot)
Obtain the next message from the mst, assuming that there are more unprocessed messages in the intern...
Definition: mst.c:394
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
static int ret
Return value of the commandline.
Definition: gnunet-abd.c:81
#define GNUNET_NO
Definition: gnunet_common.h:78
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:75
#define GNUNET_new(type)
Allocate a struct or union of the given type.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
int(* GNUNET_MessageTokenizerCallback)(void *cls, const struct GNUNET_MessageHeader *message)
Functions with this signature are called whenever a complete message is received by the tokenizer...
#define GNUNET_log_strerror(level, cmd)
Log an error message at log-level &#39;level&#39; that indicates a failure of the command &#39;cmd&#39; with the mess...
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
void GNUNET_MST_destroy(struct GNUNET_MessageStreamTokenizer *mst)
Destroys a tokenizer.
Definition: mst.c:411
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
#define GNUNET_realloc(ptr, size)
Wrapper around realloc.
Handle to a message stream tokenizer.
Definition: mst.c:43
#define GNUNET_MIN(a, b)
Definition: gnunet_common.h:80
static char buf[2048]
#define GNUNET_MIN_MESSAGE_SIZE
Smallest supported message.
struct GNUNET_MessageStreamTokenizer * GNUNET_MST_create(GNUNET_MessageTokenizerCallback cb, void *cb_cls)
Create a message stream tokenizer.
Definition: mst.c:85
int GNUNET_MST_from_buffer(struct GNUNET_MessageStreamTokenizer *mst, const char *buf, size_t size, int purge, int one_shot)
Add incoming data to the receive buffer and call the callback for all complete messages.
Definition: mst.c:114
#define GNUNET_SYSERR
Definition: gnunet_common.h:76
static unsigned int size
Size of the "table".
Definition: peer.c:67
struct GNUNET_MessageHeader * hdr
Beginning of the buffer.
Definition: mst.c:73
size_t pos
How many bytes in buffer are valid right now?
Definition: mst.c:68
#define GNUNET_log(kind,...)
int GNUNET_MST_read(struct GNUNET_MessageStreamTokenizer *mst, struct GNUNET_NETWORK_Handle *sock, int purge, int one_shot)
Add incoming data to the receive buffer and call the callback for all complete messages.
Definition: mst.c:345
#define LOG(kind,...)
Definition: mst.c:37
handle to a socket
Definition: network.c:52
size_t curr_buf
Size of the buffer (starting at hdr).
Definition: mst.c:58
Header for all communications.
#define GNUNET_YES
Definition: gnunet_common.h:77
GNUNET_MessageTokenizerCallback cb
Function to call on completed messages.
Definition: mst.c:48
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_free(ptr)
Wrapper around free.