GNUnet  0.19.5
mst.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010, 2016, 2017 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
28 #include "platform.h"
29 #include "gnunet_util_lib.h"
30 
31 
32 #if HAVE_UNALIGNED_64_ACCESS
33 #define ALIGN_FACTOR 4
34 #else
35 #define ALIGN_FACTOR 8
36 #endif
37 
38 #define LOG(kind, ...) GNUNET_log_from (kind, "util-mst", __VA_ARGS__)
39 
40 
45 {
50 
54  void *cb_cls;
55 
59  size_t curr_buf;
60 
64  size_t off;
65 
69  size_t pos;
70 
75 };
76 
77 
87  void *cb_cls)
88 {
90 
93  ret->curr_buf = GNUNET_MIN_MESSAGE_SIZE;
94  ret->cb = cb;
95  ret->cb_cls = cb_cls;
96  return ret;
97 }
98 
99 
102  const char *buf,
103  size_t size,
104  int purge,
105  int one_shot)
106 {
107  const struct GNUNET_MessageHeader *hdr;
108  size_t delta;
109  uint16_t want;
110  char *ibuf;
111  int ret;
112  int cbret;
113 
114  GNUNET_assert (mst->off <= mst->pos);
115  GNUNET_assert (mst->pos <= mst->curr_buf);
117  "MST receives %u bytes with %u (%u/%u) bytes already in private buffer\n",
118  (unsigned int) size,
119  (unsigned int) (mst->pos - mst->off),
120  (unsigned int) mst->pos,
121  (unsigned int) mst->off);
122  ret = GNUNET_OK;
123  ibuf = (char *) mst->hdr;
124  while (mst->pos > 0)
125  {
126 do_align:
127  GNUNET_assert (mst->pos >= mst->off);
128  if ((mst->curr_buf - mst->off < sizeof(struct GNUNET_MessageHeader)) ||
129  (0 != (mst->off % ALIGN_FACTOR)))
130  {
131  /* need to align or need more space */
132  mst->pos -= mst->off;
133  memmove (ibuf,
134  &ibuf[mst->off],
135  mst->pos);
136  mst->off = 0;
137  }
138  if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
139  {
140  delta
141  = GNUNET_MIN (sizeof(struct GNUNET_MessageHeader)
142  - (mst->pos - mst->off),
143  size);
144  GNUNET_memcpy (&ibuf[mst->pos],
145  buf,
146  delta);
147  mst->pos += delta;
148  buf += delta;
149  size -= delta;
150  }
151  if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
152  {
153  if (purge)
154  {
155  mst->off = 0;
156  mst->pos = 0;
157  }
158  return GNUNET_OK;
159  }
160  hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
161  want = ntohs (hdr->size);
163  "We want to read message of size %u\n",
164  want);
165  if (want < sizeof(struct GNUNET_MessageHeader))
166  {
167  GNUNET_break_op (0);
168  return GNUNET_SYSERR;
169  }
170  if ((mst->curr_buf - mst->off < want) &&
171  (mst->off > 0))
172  {
173  /* can get more space by moving */
174  mst->pos -= mst->off;
175  memmove (ibuf,
176  &ibuf[mst->off],
177  mst->pos);
178  mst->off = 0;
179  }
180  if (mst->curr_buf < want)
181  {
182  /* need to get more space by growing buffer */
183  GNUNET_assert (0 == mst->off);
184  mst->hdr = GNUNET_realloc (mst->hdr,
185  want);
186  ibuf = (char *) mst->hdr;
187  mst->curr_buf = want;
188  }
189  hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
190  if (mst->pos - mst->off < want)
191  {
192  delta = GNUNET_MIN (want - (mst->pos - mst->off),
193  size);
194  GNUNET_assert (mst->pos + delta <= mst->curr_buf);
195  GNUNET_memcpy (&ibuf[mst->pos],
196  buf,
197  delta);
198  mst->pos += delta;
199  buf += delta;
200  size -= delta;
201  }
202  if (mst->pos - mst->off < want)
203  {
204  if (purge)
205  {
206  mst->off = 0;
207  mst->pos = 0;
208  }
209  return GNUNET_OK;
210  }
211  if (one_shot == GNUNET_SYSERR)
212  {
213  /* cannot call callback again, but return value saying that
214  * we have another full message in the buffer */
215  ret = GNUNET_NO;
216  goto copy;
217  }
218  if (one_shot == GNUNET_YES)
219  one_shot = GNUNET_SYSERR;
220  mst->off += want;
221  if (GNUNET_OK !=
222  (cbret = mst->cb (mst->cb_cls,
223  hdr)))
224  {
225  if (GNUNET_SYSERR == cbret)
227  "Failure processing message of type %u and size %u\n",
228  ntohs (hdr->type),
229  ntohs (hdr->size));
230  return GNUNET_SYSERR;
231  }
232  if (mst->off == mst->pos)
233  {
234  /* reset to beginning of buffer, it's free right now! */
235  mst->off = 0;
236  mst->pos = 0;
237  }
238  }
239  GNUNET_assert (0 == mst->pos);
240  while (size > 0)
241  {
242  unsigned long offset = (unsigned long) buf;
243  bool need_align = (0 != (offset % ALIGN_FACTOR));
244 
246  "Server-mst has %u bytes left in inbound buffer\n",
247  (unsigned int) size);
248  if (size < sizeof(struct GNUNET_MessageHeader))
249  break;
250  if (! need_align)
251  {
252  /* can try to do zero-copy and process directly from original buffer */
253  hdr = (const struct GNUNET_MessageHeader *) buf;
254  want = ntohs (hdr->size);
255  if (want < sizeof(struct GNUNET_MessageHeader))
256  {
257  GNUNET_break_op (0);
258  mst->off = 0;
259  return GNUNET_SYSERR;
260  }
261  if (size < want)
262  break; /* or not: buffer incomplete, so copy to private buffer... */
263  if (one_shot == GNUNET_SYSERR)
264  {
265  /* cannot call callback again, but return value saying that
266  * we have another full message in the buffer */
267  ret = GNUNET_NO;
268  goto copy;
269  }
270  if (GNUNET_YES == one_shot)
271  one_shot = GNUNET_SYSERR;
272  if (GNUNET_OK !=
273  (cbret = mst->cb (mst->cb_cls,
274  hdr)))
275  {
276  if (GNUNET_SYSERR == cbret)
278  "Failure processing message of type %u and size %u\n",
279  ntohs (hdr->type),
280  ntohs (hdr->size));
281  return GNUNET_SYSERR;
282  }
283  buf += want;
284  size -= want;
285  }
286  else
287  {
288  /* need to copy to private buffer to align;
289  * yes, we go a bit more spaghetti than usual here */
290  goto do_align;
291  }
292  }
293 copy:
294  if ((size > 0) && (! purge))
295  {
296  if (size + mst->pos > mst->curr_buf)
297  {
298  mst->hdr = GNUNET_realloc (mst->hdr,
299  size + mst->pos);
300  ibuf = (char *) mst->hdr;
301  mst->curr_buf = size + mst->pos;
302  }
303  GNUNET_assert (size + mst->pos <= mst->curr_buf);
304  GNUNET_memcpy (&ibuf[mst->pos],
305  buf,
306  size);
307  mst->pos += size;
308  }
309  if (purge)
310  {
311  mst->off = 0;
312  mst->pos = 0;
313  }
315  "Server-mst leaves %u (%u/%u) bytes in private buffer\n",
316  (unsigned int) (mst->pos - mst->off),
317  (unsigned int) mst->pos,
318  (unsigned int) mst->off);
319  return ret;
320 }
321 
322 
339  struct GNUNET_NETWORK_Handle *sock,
340  int purge,
341  int one_shot)
342 {
343  ssize_t ret;
344  size_t left;
345  char *buf;
346 
347  left = mst->curr_buf - mst->pos;
348  buf = (char *) mst->hdr;
350  &buf[mst->pos],
351  left);
352  if (-1 == ret)
353  {
354  if ((EAGAIN == errno) ||
355  (EINTR == errno))
356  return GNUNET_OK;
358  "recv");
359  return GNUNET_SYSERR;
360  }
361  if (0 == ret)
362  {
363  /* other side closed connection, treat as error */
364  return GNUNET_SYSERR;
365  }
366  mst->pos += ret;
367  return GNUNET_MST_from_buffer (mst,
368  NULL,
369  0,
370  purge,
371  one_shot);
372 }
373 
374 
388  int one_shot)
389 {
390  return GNUNET_MST_from_buffer (mst,
391  NULL,
392  0,
393  GNUNET_NO,
394  one_shot);
395 }
396 
397 
403 void
405 {
406  GNUNET_free (mst->hdr);
407  GNUNET_free (mst);
408 }
409 
410 
411 /* end of server_mst.c */
static size_t do_align(size_t start_position, size_t end_position)
Given the start and end position of a block of data, return the end position of that data after align...
Definition: fs_directory.c:488
static int ret
Return value of the commandline.
Definition: gnunet-abd.c:81
static char buf[2048]
#define GNUNET_MIN_MESSAGE_SIZE
Smallest supported message.
#define GNUNET_log(kind,...)
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
GNUNET_GenericReturnValue
Named constants for return values.
#define GNUNET_MIN(a, b)
@ GNUNET_OK
@ GNUNET_YES
@ GNUNET_NO
@ GNUNET_SYSERR
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_log_strerror(level, cmd)
Log an error message at log-level 'level' that indicates a failure of the command 'cmd' with the mess...
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_DEBUG
@ GNUNET_ERROR_TYPE_INFO
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_realloc(ptr, size)
Wrapper around realloc.
#define GNUNET_free(ptr)
Wrapper around free.
ssize_t GNUNET_NETWORK_socket_recv(const struct GNUNET_NETWORK_Handle *desc, void *buffer, size_t length)
Read data from a connected socket (always non-blocking).
Definition: network.c:717
int(* GNUNET_MessageTokenizerCallback)(void *cls, const struct GNUNET_MessageHeader *message)
Functions with this signature are called whenever a complete message is received by the tokenizer.
enum GNUNET_GenericReturnValue GNUNET_MST_next(struct GNUNET_MessageStreamTokenizer *mst, int one_shot)
Obtain the next message from the mst, assuming that there are more unprocessed messages in the intern...
Definition: mst.c:387
enum GNUNET_GenericReturnValue GNUNET_MST_from_buffer(struct GNUNET_MessageStreamTokenizer *mst, const char *buf, size_t size, int purge, int one_shot)
Add incoming data to the receive buffer and call the callback for all complete messages.
Definition: mst.c:101
void GNUNET_MST_destroy(struct GNUNET_MessageStreamTokenizer *mst)
Destroys a tokenizer.
Definition: mst.c:404
enum GNUNET_GenericReturnValue GNUNET_MST_read(struct GNUNET_MessageStreamTokenizer *mst, struct GNUNET_NETWORK_Handle *sock, int purge, int one_shot)
Add incoming data to the receive buffer and call the callback for all complete messages.
Definition: mst.c:338
struct GNUNET_MessageStreamTokenizer * GNUNET_MST_create(GNUNET_MessageTokenizerCallback cb, void *cb_cls)
Create a message stream tokenizer.
Definition: mst.c:86
#define ALIGN_FACTOR
Definition: mst.c:35
#define LOG(kind,...)
Definition: mst.c:38
static unsigned int size
Size of the "table".
Definition: peer.c:68
static struct GNUNET_TIME_Relative delta
Definition: speedup.c:36
Header for all communications.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
Handle to a message stream tokenizer.
Definition: mst.c:45
size_t curr_buf
Size of the buffer (starting at hdr).
Definition: mst.c:59
size_t pos
How many bytes in buffer are valid right now?
Definition: mst.c:69
void * cb_cls
Closure for cb.
Definition: mst.c:54
struct GNUNET_MessageHeader * hdr
Beginning of the buffer.
Definition: mst.c:74
size_t off
How many bytes in buffer have we already processed?
Definition: mst.c:64
GNUNET_MessageTokenizerCallback cb
Function to call on completed messages.
Definition: mst.c:49
handle to a socket
Definition: network.c:54