GNUnet  0.16.x
mst.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010, 2016, 2017 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 
30 
31 #if HAVE_UNALIGNED_64_ACCESS
32 #define ALIGN_FACTOR 4
33 #else
34 #define ALIGN_FACTOR 8
35 #endif
36 
37 #define LOG(kind, ...) GNUNET_log_from (kind, "util-mst", __VA_ARGS__)
38 
39 
44 {
49 
53  void *cb_cls;
54 
58  size_t curr_buf;
59 
63  size_t off;
64 
68  size_t pos;
69 
74 };
75 
76 
86  void *cb_cls)
87 {
89 
92  ret->curr_buf = GNUNET_MIN_MESSAGE_SIZE;
93  ret->cb = cb;
94  ret->cb_cls = cb_cls;
95  return ret;
96 }
97 
98 
113 int
115  const char *buf,
116  size_t size,
117  int purge,
118  int one_shot)
119 {
120  const struct GNUNET_MessageHeader *hdr;
121  size_t delta;
122  uint16_t want;
123  char *ibuf;
124  int need_align;
125  unsigned long offset;
126  int ret;
127  int cbret;
128 
129  GNUNET_assert (mst->off <= mst->pos);
130  GNUNET_assert (mst->pos <= mst->curr_buf);
132  "MST receives %u bytes with %u (%u/%u) bytes already in private buffer\n",
133  (unsigned int) size,
134  (unsigned int) (mst->pos - mst->off),
135  (unsigned int) mst->pos,
136  (unsigned int) mst->off);
137  ret = GNUNET_OK;
138  ibuf = (char *) mst->hdr;
139  while (mst->pos > 0)
140  {
141  do_align:
142  GNUNET_assert (mst->pos >= mst->off);
143  if ((mst->curr_buf - mst->off < sizeof(struct GNUNET_MessageHeader)) ||
144  (0 != (mst->off % ALIGN_FACTOR)))
145  {
146  /* need to align or need more space */
147  mst->pos -= mst->off;
148  memmove (ibuf,
149  &ibuf[mst->off],
150  mst->pos);
151  mst->off = 0;
152  }
153  if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
154  {
155  delta
156  = GNUNET_MIN (sizeof(struct GNUNET_MessageHeader)
157  - (mst->pos - mst->off),
158  size);
159  GNUNET_memcpy (&ibuf[mst->pos],
160  buf,
161  delta);
162  mst->pos += delta;
163  buf += delta;
164  size -= delta;
165  }
166  if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
167  {
168  if (purge)
169  {
170  mst->off = 0;
171  mst->pos = 0;
172  }
173  return GNUNET_OK;
174  }
175  hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
176  want = ntohs (hdr->size);
178  "We want to read message of size %u\n",
179  want);
180  if (want < sizeof(struct GNUNET_MessageHeader))
181  {
182  GNUNET_break_op (0);
183  return GNUNET_SYSERR;
184  }
185  if ((mst->curr_buf - mst->off < want) &&
186  (mst->off > 0))
187  {
188  /* can get more space by moving */
189  mst->pos -= mst->off;
190  memmove (ibuf,
191  &ibuf[mst->off],
192  mst->pos);
193  mst->off = 0;
194  }
195  if (mst->curr_buf < want)
196  {
197  /* need to get more space by growing buffer */
198  GNUNET_assert (0 == mst->off);
199  mst->hdr = GNUNET_realloc (mst->hdr,
200  want);
201  ibuf = (char *) mst->hdr;
202  mst->curr_buf = want;
203  }
204  hdr = (const struct GNUNET_MessageHeader *) &ibuf[mst->off];
205  if (mst->pos - mst->off < want)
206  {
207  delta = GNUNET_MIN (want - (mst->pos - mst->off),
208  size);
209  GNUNET_assert (mst->pos + delta <= mst->curr_buf);
210  GNUNET_memcpy (&ibuf[mst->pos],
211  buf,
212  delta);
213  mst->pos += delta;
214  buf += delta;
215  size -= delta;
216  }
217  if (mst->pos - mst->off < want)
218  {
219  if (purge)
220  {
221  mst->off = 0;
222  mst->pos = 0;
223  }
224  return GNUNET_OK;
225  }
226  if (one_shot == GNUNET_SYSERR)
227  {
228  /* cannot call callback again, but return value saying that
229  * we have another full message in the buffer */
230  ret = GNUNET_NO;
231  goto copy;
232  }
233  if (one_shot == GNUNET_YES)
234  one_shot = GNUNET_SYSERR;
235  mst->off += want;
236  if (GNUNET_OK !=
237  (cbret = mst->cb (mst->cb_cls,
238  hdr)))
239  {
240  if (GNUNET_SYSERR == cbret)
242  "Failure processing message of type %u and size %u\n",
243  ntohs (hdr->type),
244  ntohs (hdr->size));
245  return GNUNET_SYSERR;
246  }
247  if (mst->off == mst->pos)
248  {
249  /* reset to beginning of buffer, it's free right now! */
250  mst->off = 0;
251  mst->pos = 0;
252  }
253  }
254  GNUNET_assert (0 == mst->pos);
255  while (size > 0)
256  {
258  "Server-mst has %u bytes left in inbound buffer\n",
259  (unsigned int) size);
260  if (size < sizeof(struct GNUNET_MessageHeader))
261  break;
262  offset = (unsigned long) buf;
263  need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO;
264  if (GNUNET_NO == need_align)
265  {
266  /* can try to do zero-copy and process directly from original buffer */
267  hdr = (const struct GNUNET_MessageHeader *) buf;
268  want = ntohs (hdr->size);
269  if (want < sizeof(struct GNUNET_MessageHeader))
270  {
271  GNUNET_break_op (0);
272  mst->off = 0;
273  return GNUNET_SYSERR;
274  }
275  if (size < want)
276  break; /* or not: buffer incomplete, so copy to private buffer... */
277  if (one_shot == GNUNET_SYSERR)
278  {
279  /* cannot call callback again, but return value saying that
280  * we have another full message in the buffer */
281  ret = GNUNET_NO;
282  goto copy;
283  }
284  if (one_shot == GNUNET_YES)
285  one_shot = GNUNET_SYSERR;
286  if (GNUNET_OK !=
287  (cbret = mst->cb (mst->cb_cls,
288  hdr)))
289  {
290  if (GNUNET_SYSERR == cbret)
292  "Failure processing message of type %u and size %u\n",
293  ntohs (hdr->type),
294  ntohs (hdr->size));
295  return GNUNET_SYSERR;
296  }
297  buf += want;
298  size -= want;
299  }
300  else
301  {
302  /* need to copy to private buffer to align;
303  * yes, we go a bit more spaghetti than usual here */
304  goto do_align;
305  }
306  }
307  copy:
308  if ((size > 0) && (! purge))
309  {
310  if (size + mst->pos > mst->curr_buf)
311  {
312  mst->hdr = GNUNET_realloc (mst->hdr,
313  size + mst->pos);
314  ibuf = (char *) mst->hdr;
315  mst->curr_buf = size + mst->pos;
316  }
317  GNUNET_assert (size + mst->pos <= mst->curr_buf);
318  GNUNET_memcpy (&ibuf[mst->pos],
319  buf,
320  size);
321  mst->pos += size;
322  }
323  if (purge)
324  {
325  mst->off = 0;
326  mst->pos = 0;
327  }
329  "Server-mst leaves %u (%u/%u) bytes in private buffer\n",
330  (unsigned int) (mst->pos - mst->off),
331  (unsigned int) mst->pos,
332  (unsigned int) mst->off);
333  return ret;
334 }
335 
336 
351 int
353  struct GNUNET_NETWORK_Handle *sock,
354  int purge,
355  int one_shot)
356 {
357  ssize_t ret;
358  size_t left;
359  char *buf;
360 
361  left = mst->curr_buf - mst->pos;
362  buf = (char *) mst->hdr;
364  &buf[mst->pos],
365  left);
366  if (-1 == ret)
367  {
368  if ((EAGAIN == errno) ||
369  (EINTR == errno))
370  return GNUNET_OK;
372  "recv");
373  return GNUNET_SYSERR;
374  }
375  if (0 == ret)
376  {
377  /* other side closed connection, treat as error */
378  return GNUNET_SYSERR;
379  }
380  mst->pos += ret;
381  return GNUNET_MST_from_buffer (mst,
382  NULL,
383  0,
384  purge,
385  one_shot);
386 }
387 
388 
400 int
402  int one_shot)
403 {
404  return GNUNET_MST_from_buffer (mst,
405  NULL,
406  0,
407  GNUNET_NO,
408  one_shot);
409 }
410 
411 
417 void
419 {
420  GNUNET_free (mst->hdr);
421  GNUNET_free (mst);
422 }
423 
424 
425 /* end of server_mst.c */
static size_t do_align(size_t start_position, size_t end_position)
Given the start and end position of a block of data, return the end position of that data after align...
Definition: fs_directory.c:487
static int ret
Return value of the commandline.
Definition: gnunet-abd.c:81
static char buf[2048]
#define GNUNET_log(kind,...)
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
@ GNUNET_OK
Definition: gnunet_common.h:95
@ GNUNET_YES
Definition: gnunet_common.h:97
@ GNUNET_NO
Definition: gnunet_common.h:94
@ GNUNET_SYSERR
Definition: gnunet_common.h:93
#define GNUNET_MIN(a, b)
#define GNUNET_MIN_MESSAGE_SIZE
Smallest supported message.
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_log_strerror(level, cmd)
Log an error message at log-level 'level' that indicates a failure of the command 'cmd' with the mess...
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_DEBUG
@ GNUNET_ERROR_TYPE_INFO
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_realloc(ptr, size)
Wrapper around realloc.
#define GNUNET_free(ptr)
Wrapper around free.
ssize_t GNUNET_NETWORK_socket_recv(const struct GNUNET_NETWORK_Handle *desc, void *buffer, size_t length)
Read data from a connected socket (always non-blocking).
Definition: network.c:731
int GNUNET_MST_next(struct GNUNET_MessageStreamTokenizer *mst, int one_shot)
Obtain the next message from the mst, assuming that there are more unprocessed messages in the intern...
Definition: mst.c:401
int GNUNET_MST_read(struct GNUNET_MessageStreamTokenizer *mst, struct GNUNET_NETWORK_Handle *sock, int purge, int one_shot)
Add incoming data to the receive buffer and call the callback for all complete messages.
Definition: mst.c:352
int(* GNUNET_MessageTokenizerCallback)(void *cls, const struct GNUNET_MessageHeader *message)
Functions with this signature are called whenever a complete message is received by the tokenizer.
void GNUNET_MST_destroy(struct GNUNET_MessageStreamTokenizer *mst)
Destroys a tokenizer.
Definition: mst.c:418
struct GNUNET_MessageStreamTokenizer * GNUNET_MST_create(GNUNET_MessageTokenizerCallback cb, void *cb_cls)
Create a message stream tokenizer.
Definition: mst.c:85
int GNUNET_MST_from_buffer(struct GNUNET_MessageStreamTokenizer *mst, const char *buf, size_t size, int purge, int one_shot)
Add incoming data to the receive buffer and call the callback for all complete messages.
Definition: mst.c:114
#define ALIGN_FACTOR
Definition: mst.c:34
#define LOG(kind,...)
Definition: mst.c:37
static unsigned int size
Size of the "table".
Definition: peer.c:67
static struct GNUNET_TIME_Relative delta
Definition: speedup.c:35
Header for all communications.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
Handle to a message stream tokenizer.
Definition: mst.c:44
size_t curr_buf
Size of the buffer (starting at hdr).
Definition: mst.c:58
size_t pos
How many bytes in buffer are valid right now?
Definition: mst.c:68
void * cb_cls
Closure for cb.
Definition: mst.c:53
struct GNUNET_MessageHeader * hdr
Beginning of the buffer.
Definition: mst.c:73
size_t off
How many bytes in buffer have we already processed?
Definition: mst.c:63
GNUNET_MessageTokenizerCallback cb
Function to call on completed messages.
Definition: mst.c:48
handle to a socket
Definition: network.c:53