GNUnet  0.10.x
mst.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010, 2016, 2017 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
27 #include "platform.h"
28 #include "gnunet_util_lib.h"
29 
30 
31 #if HAVE_UNALIGNED_64_ACCESS
32 #define ALIGN_FACTOR 4
33 #else
34 #define ALIGN_FACTOR 8
35 #endif
36 
37 #define LOG(kind, ...) GNUNET_log_from(kind, "util-mst", __VA_ARGS__)
38 
39 
48 
52  void *cb_cls;
53 
57  size_t curr_buf;
58 
62  size_t off;
63 
67  size_t pos;
68 
73 };
74 
75 
85  void *cb_cls)
86 {
88 
92  ret->cb = cb;
93  ret->cb_cls = cb_cls;
94  return ret;
95 }
96 
97 
112 int
114  const char *buf,
115  size_t size,
116  int purge,
117  int one_shot)
118 {
119  const struct GNUNET_MessageHeader *hdr;
120  size_t delta;
121  uint16_t want;
122  char *ibuf;
123  int need_align;
124  unsigned long offset;
125  int ret;
126  int cbret;
127 
128  GNUNET_assert(mst->off <= mst->pos);
129  GNUNET_assert(mst->pos <= mst->curr_buf);
131  "MST receives %u bytes with %u bytes already in private buffer\n",
132  (unsigned int)size,
133  (unsigned int)(mst->pos - mst->off));
134  ret = GNUNET_OK;
135  ibuf = (char *)mst->hdr;
136  while (mst->pos > 0)
137  {
138 do_align:
139  GNUNET_assert(mst->pos >= mst->off);
140  if ((mst->curr_buf - mst->off < sizeof(struct GNUNET_MessageHeader)) ||
141  (0 != (mst->off % ALIGN_FACTOR)))
142  {
143  /* need to align or need more space */
144  mst->pos -= mst->off;
145  memmove(ibuf,
146  &ibuf[mst->off],
147  mst->pos);
148  mst->off = 0;
149  }
150  if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
151  {
152  delta
153  = GNUNET_MIN(sizeof(struct GNUNET_MessageHeader)
154  - (mst->pos - mst->off),
155  size);
156  GNUNET_memcpy(&ibuf[mst->pos],
157  buf,
158  delta);
159  mst->pos += delta;
160  buf += delta;
161  size -= delta;
162  }
163  if (mst->pos - mst->off < sizeof(struct GNUNET_MessageHeader))
164  {
165  if (purge)
166  {
167  mst->off = 0;
168  mst->pos = 0;
169  }
170  return GNUNET_OK;
171  }
172  hdr = (const struct GNUNET_MessageHeader *)&ibuf[mst->off];
173  want = ntohs(hdr->size);
174  if (want < sizeof(struct GNUNET_MessageHeader))
175  {
176  GNUNET_break_op(0);
177  return GNUNET_SYSERR;
178  }
179  if ((mst->curr_buf - mst->off < want) &&
180  (mst->off > 0))
181  {
182  /* can get more space by moving */
183  mst->pos -= mst->off;
184  memmove(ibuf,
185  &ibuf[mst->off],
186  mst->pos);
187  mst->off = 0;
188  }
189  if (mst->curr_buf < want)
190  {
191  /* need to get more space by growing buffer */
192  GNUNET_assert(0 == mst->off);
193  mst->hdr = GNUNET_realloc(mst->hdr,
194  want);
195  ibuf = (char *)mst->hdr;
196  mst->curr_buf = want;
197  }
198  hdr = (const struct GNUNET_MessageHeader *)&ibuf[mst->off];
199  if (mst->pos - mst->off < want)
200  {
201  delta = GNUNET_MIN(want - (mst->pos - mst->off),
202  size);
203  GNUNET_assert(mst->pos + delta <= mst->curr_buf);
204  GNUNET_memcpy(&ibuf[mst->pos],
205  buf,
206  delta);
207  mst->pos += delta;
208  buf += delta;
209  size -= delta;
210  }
211  if (mst->pos - mst->off < want)
212  {
213  if (purge)
214  {
215  mst->off = 0;
216  mst->pos = 0;
217  }
218  return GNUNET_OK;
219  }
220  if (one_shot == GNUNET_SYSERR)
221  {
222  /* cannot call callback again, but return value saying that
223  * we have another full message in the buffer */
224  ret = GNUNET_NO;
225  goto copy;
226  }
227  if (one_shot == GNUNET_YES)
228  one_shot = GNUNET_SYSERR;
229  mst->off += want;
230  if (GNUNET_OK !=
231  (cbret = mst->cb(mst->cb_cls,
232  hdr)))
233  {
234  if (GNUNET_SYSERR == cbret)
236  "Failure processing message of type %u and size %u\n",
237  ntohs(hdr->type),
238  ntohs(hdr->size));
239  return GNUNET_SYSERR;
240  }
241  if (mst->off == mst->pos)
242  {
243  /* reset to beginning of buffer, it's free right now! */
244  mst->off = 0;
245  mst->pos = 0;
246  }
247  }
248  GNUNET_assert(0 == mst->pos);
249  while (size > 0)
250  {
252  "Server-mst has %u bytes left in inbound buffer\n",
253  (unsigned int)size);
254  if (size < sizeof(struct GNUNET_MessageHeader))
255  break;
256  offset = (unsigned long)buf;
257  need_align = (0 != (offset % ALIGN_FACTOR)) ? GNUNET_YES : GNUNET_NO;
258  if (GNUNET_NO == need_align)
259  {
260  /* can try to do zero-copy and process directly from original buffer */
261  hdr = (const struct GNUNET_MessageHeader *)buf;
262  want = ntohs(hdr->size);
263  if (want < sizeof(struct GNUNET_MessageHeader))
264  {
265  GNUNET_break_op(0);
266  mst->off = 0;
267  return GNUNET_SYSERR;
268  }
269  if (size < want)
270  break; /* or not: buffer incomplete, so copy to private buffer... */
271  if (one_shot == GNUNET_SYSERR)
272  {
273  /* cannot call callback again, but return value saying that
274  * we have another full message in the buffer */
275  ret = GNUNET_NO;
276  goto copy;
277  }
278  if (one_shot == GNUNET_YES)
279  one_shot = GNUNET_SYSERR;
280  if (GNUNET_OK !=
281  (cbret = mst->cb(mst->cb_cls,
282  hdr)))
283  {
284  if (GNUNET_SYSERR == cbret)
286  "Failure processing message of type %u and size %u\n",
287  ntohs(hdr->type),
288  ntohs(hdr->size));
289  return GNUNET_SYSERR;
290  }
291  buf += want;
292  size -= want;
293  }
294  else
295  {
296  /* need to copy to private buffer to align;
297  * yes, we go a bit more spagetti than usual here */
298  goto do_align;
299  }
300  }
301 copy:
302  if ((size > 0) && (!purge))
303  {
304  if (size + mst->pos > mst->curr_buf)
305  {
306  mst->hdr = GNUNET_realloc(mst->hdr,
307  size + mst->pos);
308  ibuf = (char *)mst->hdr;
309  mst->curr_buf = size + mst->pos;
310  }
311  GNUNET_assert(size + mst->pos <= mst->curr_buf);
312  GNUNET_memcpy(&ibuf[mst->pos],
313  buf,
314  size);
315  mst->pos += size;
316  }
317  if (purge)
318  {
319  mst->off = 0;
320  mst->pos = 0;
321  }
323  "Server-mst leaves %u bytes in private buffer\n",
324  (unsigned int)(mst->pos - mst->off));
325  return ret;
326 }
327 
328 
343 int
345  struct GNUNET_NETWORK_Handle *sock,
346  int purge,
347  int one_shot)
348 {
349  ssize_t ret;
350  size_t left;
351  char *buf;
352 
353  left = mst->curr_buf - mst->pos;
354  buf = (char *)mst->hdr;
355  ret = GNUNET_NETWORK_socket_recv(sock,
356  &buf[mst->pos],
357  left);
358  if (-1 == ret)
359  {
360  if ((EAGAIN == errno) ||
361  (EINTR == errno))
362  return GNUNET_OK;
364  "recv");
365  return GNUNET_SYSERR;
366  }
367  if (0 == ret)
368  {
369  /* other side closed connection, treat as error */
370  return GNUNET_SYSERR;
371  }
372  mst->pos += ret;
373  return GNUNET_MST_from_buffer(mst,
374  NULL,
375  0,
376  purge,
377  one_shot);
378 }
379 
380 
392 int
394  int one_shot)
395 {
396  return GNUNET_MST_from_buffer(mst,
397  NULL,
398  0,
399  GNUNET_NO,
400  one_shot);
401 }
402 
403 
409 void
411 {
412  GNUNET_free(mst->hdr);
413  GNUNET_free(mst);
414 }
415 
416 
417 
418 /* end of server_mst.c */
size_t off
How many bytes in buffer have we already processed?
Definition: mst.c:62
static struct GNUNET_TIME_Relative delta
Definition: speedup.c:35
void * cb_cls
Closure for cb.
Definition: mst.c:52
static size_t do_align(size_t start_position, size_t end_position)
Given the start and end position of a block of data, return the end position of that data after align...
Definition: fs_directory.c:480
ssize_t GNUNET_NETWORK_socket_recv(const struct GNUNET_NETWORK_Handle *desc, void *buffer, size_t length)
Read data from a connected socket (always non-blocking).
Definition: network.c:775
#define ALIGN_FACTOR
Definition: mst.c:34
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
int GNUNET_MST_next(struct GNUNET_MessageStreamTokenizer *mst, int one_shot)
Obtain the next message from the mst, assuming that there are more unprocessed messages in the intern...
Definition: mst.c:393
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
#define GNUNET_NO
Definition: gnunet_common.h:78
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:75
#define GNUNET_new(type)
Allocate a struct or union of the given type.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
static int ret
Final status code.
Definition: gnunet-arm.c:89
int(* GNUNET_MessageTokenizerCallback)(void *cls, const struct GNUNET_MessageHeader *message)
Functions with this signature are called whenever a complete message is received by the tokenizer...
#define GNUNET_log_strerror(level, cmd)
Log an error message at log-level &#39;level&#39; that indicates a failure of the command &#39;cmd&#39; with the mess...
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
void GNUNET_MST_destroy(struct GNUNET_MessageStreamTokenizer *mst)
Destroys a tokenizer.
Definition: mst.c:410
#define GNUNET_break_op(cond)
Use this for assertion violations caused by other peers (i.e.
#define GNUNET_realloc(ptr, size)
Wrapper around realloc.
Handle to a message stream tokenizer.
Definition: mst.c:43
#define GNUNET_MIN(a, b)
Definition: gnunet_common.h:80
static char buf[2048]
#define GNUNET_MIN_MESSAGE_SIZE
Smallest supported message.
struct GNUNET_MessageStreamTokenizer * GNUNET_MST_create(GNUNET_MessageTokenizerCallback cb, void *cb_cls)
Create a message stream tokenizer.
Definition: mst.c:84
int GNUNET_MST_from_buffer(struct GNUNET_MessageStreamTokenizer *mst, const char *buf, size_t size, int purge, int one_shot)
Add incoming data to the receive buffer and call the callback for all complete messages.
Definition: mst.c:113
#define GNUNET_SYSERR
Definition: gnunet_common.h:76
static unsigned int size
Size of the "table".
Definition: peer.c:66
struct GNUNET_MessageHeader * hdr
Beginning of the buffer.
Definition: mst.c:72
size_t pos
How many bytes in buffer are valid right now?
Definition: mst.c:67
#define GNUNET_log(kind,...)
int GNUNET_MST_read(struct GNUNET_MessageStreamTokenizer *mst, struct GNUNET_NETWORK_Handle *sock, int purge, int one_shot)
Add incoming data to the receive buffer and call the callback for all complete messages.
Definition: mst.c:344
#define LOG(kind,...)
Definition: mst.c:37
handle to a socket
Definition: network.c:46
size_t curr_buf
Size of the buffer (starting at hdr).
Definition: mst.c:57
Header for all communications.
#define GNUNET_YES
Definition: gnunet_common.h:77
GNUNET_MessageTokenizerCallback cb
Function to call on completed messages.
Definition: mst.c:47
#define GNUNET_malloc(size)
Wrapper around malloc.
#define GNUNET_free(ptr)
Wrapper around free.