GNUnet  0.19.3
pq_event.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2021, 2023 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
25 #include "platform.h"
26 #include "pq.h"
27 #include <pthread.h>
28 
29 
34 {
38  struct GNUNET_ShortHashCode sh;
39 
44 
48  void *cb_cls;
49 
54 
59 };
60 
61 
68 static void
69 es_to_sh (const struct GNUNET_DB_EventHeaderP *es,
70  struct GNUNET_ShortHashCode *sh)
71 {
72  struct GNUNET_HashCode h_channel;
73 
75  ntohs (es->size),
76  &h_channel);
77  GNUNET_static_assert (sizeof (*sh) <= sizeof (h_channel));
78  memcpy (sh,
79  &h_channel,
80  sizeof (*sh));
81 }
82 
83 
92 static char *
94  char identifier[64])
95 {
96  char *end;
97 
99  sizeof (*sh),
100  identifier,
101  63);
102  GNUNET_assert (NULL != end);
103  *end = '\0';
104  return end;
105 }
106 
107 
115 static enum GNUNET_GenericReturnValue
116 channel_to_sh (const char *identifier,
117  struct GNUNET_ShortHashCode *sh)
118 {
119  return GNUNET_STRINGS_string_to_data (identifier,
120  strlen (identifier),
121  sh,
122  sizeof (*sh));
123 }
124 
125 
134 static char *
136  char identifier[64])
137 {
138  struct GNUNET_ShortHashCode sh;
139 
140  es_to_sh (es,
141  &sh);
142  return sh_to_channel (&sh,
143  identifier);
144 }
145 
146 
151 {
155  void *extra;
156 
160  size_t extra_size;
161 };
162 
163 
173 static int
174 do_notify (void *cls,
175  const struct GNUNET_ShortHashCode *sh,
176  void *value)
177 {
178  struct NotifyContext *ctx = cls;
179  struct GNUNET_DB_EventHandler *eh = value;
180 
181  eh->cb (eh->cb_cls,
182  ctx->extra,
183  ctx->extra_size);
184  return GNUNET_OK;
185 }
186 
187 
188 static void
190 {
191  PGnotify *n;
192  unsigned int cnt = 0;
193 
195  "PG poll job active\n");
196  if (1 !=
197  PQconsumeInput (db->conn))
198  {
200  "Failed to read from Postgres: %s\n",
201  PQerrorMessage (db->conn));
202  if (CONNECTION_BAD != PQstatus (db->conn))
203  return;
205  return;
206  }
207  while (NULL != (n = PQnotifies (db->conn)))
208  {
209  struct GNUNET_ShortHashCode sh;
210  struct NotifyContext ctx = {
211  .extra = NULL
212  };
213 
214  cnt++;
215  if ('X' != toupper ((int) n->relname[0]))
216  {
218  "Ignoring notification for unsupported channel identifier `%s'\n",
219  n->relname);
220  PQfreemem (n);
221  continue;
222  }
223  if (GNUNET_OK !=
224  channel_to_sh (&n->relname[1],
225  &sh))
226  {
228  "Ignoring notification for unsupported channel identifier `%s'\n",
229  n->relname);
230  PQfreemem (n);
231  continue;
232  }
233  if ( (NULL != n->extra) &&
234  (GNUNET_OK !=
236  strlen (n->extra),
237  &ctx.extra,
238  &ctx.extra_size)))
239  {
241  "Ignoring notification for unsupported extra data `%s' on channel `%s'\n",
242  n->extra,
243  n->relname);
244  PQfreemem (n);
245  continue;
246  }
248  "Received notification %s with extra data `%.*s'\n",
249  n->relname,
250  (int) ctx.extra_size,
251  (const char *) ctx.extra);
253  &sh,
254  &do_notify,
255  &ctx);
256  GNUNET_free (ctx.extra);
257  PQfreemem (n);
258  }
260  "PG poll job finishes after %u events\n",
261  cnt);
262 }
263 
264 
271 static void
273 {
274  struct GNUNET_PQ_Context *db = cls;
275 
276  db->event_task = NULL;
277  if (NULL == db->rfd)
279  event_do_poll (db);
280  if (NULL != db->event_task)
281  return;
283  "Resubscribing\n");
284  if (NULL == db->rfd)
285  {
286  db->resubscribe_backoff
287  = GNUNET_TIME_relative_max (db->resubscribe_backoff,
289  db->resubscribe_backoff
290  = GNUNET_TIME_STD_BACKOFF (db->resubscribe_backoff);
293  db);
294  return;
295  }
296  db->resubscribe_backoff = GNUNET_TIME_UNIT_SECONDS;
297  db->event_task
299  db->rfd,
301  db);
302 }
303 
304 
312 static void
313 scheduler_fd_cb (void *cls,
314  int fd)
315 {
316  struct GNUNET_PQ_Context *db = cls;
317 
319  "New poll FD is %d\n",
320  fd);
321  if (NULL != db->event_task)
322  {
323  GNUNET_SCHEDULER_cancel (db->event_task);
324  db->event_task = NULL;
325  }
326  GNUNET_free (db->rfd);
327  if (-1 == fd)
328  return;
329  if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
330  return;
332  "Activating poll job on %d\n",
333  fd);
335  db->event_task
337  db->rfd,
339  db);
340 }
341 
342 
350 static void
352  const char *cmd,
353  struct GNUNET_DB_EventHandler *eh)
354 {
355  char sql[16 + 64];
356  char *end;
357  PGresult *result;
358 
359  if (NULL == db->conn)
360  return;
361  end = stpcpy (sql,
362  cmd);
363  end = sh_to_channel (&eh->sh,
364  end);
366  "Executing PQ command `%s'\n",
367  sql);
368  result = PQexec (db->conn,
369  sql);
370  if (PGRES_COMMAND_OK != PQresultStatus (result))
371  {
373  "pq",
374  "Failed to execute `%s': %s/%s/%s/%s/%s",
375  sql,
376  PQresultErrorField (result,
377  PG_DIAG_MESSAGE_PRIMARY),
378  PQresultErrorField (result,
379  PG_DIAG_MESSAGE_DETAIL),
380  PQresultErrorMessage (result),
381  PQresStatus (PQresultStatus (result)),
382  PQerrorMessage (db->conn));
383  }
384  PQclear (result);
385 }
386 
387 
396 static enum GNUNET_GenericReturnValue
397 register_notify (void *cls,
398  const struct GNUNET_ShortHashCode *sh,
399  void *value)
400 {
401  struct GNUNET_PQ_Context *db = cls;
402  struct GNUNET_DB_EventHandler *eh = value;
403 
405  "LISTEN X",
406  eh);
407  return GNUNET_OK;
408 }
409 
410 
411 void
413  int fd)
414 {
416  "Change in PQ event FD to %d\n",
417  fd);
419  fd);
422  db);
423 }
424 
425 
432 static void
433 event_timeout (void *cls)
434 {
435  struct GNUNET_DB_EventHandler *eh = cls;
436 
437  eh->timeout_task = NULL;
438  eh->cb (eh->cb_cls,
439  NULL,
440  0);
441 }
442 
443 
444 struct GNUNET_DB_EventHandler *
446  const struct GNUNET_DB_EventHeaderP *es,
449  void *cb_cls)
450 {
451  struct GNUNET_DB_EventHandler *eh;
452  bool sub;
453 
454  eh = GNUNET_new (struct GNUNET_DB_EventHandler);
455  eh->db = db;
456  es_to_sh (es,
457  &eh->sh);
458  eh->cb = cb;
459  eh->cb_cls = cb_cls;
460  sub = (NULL ==
462  &eh->sh));
465  &eh->sh,
466  eh,
468  if (NULL == db->event_task)
469  {
471  "Starting event scheduler\n");
473  PQsocket (db->conn));
474  }
475  if (sub)
477  "LISTEN X",
478  eh);
480  &event_timeout,
481  eh);
482  return eh;
483 }
484 
485 
486 void
488 {
489  struct GNUNET_PQ_Context *db = eh->db;
490 
493  &eh->sh,
494  eh));
495  if (NULL ==
497  &eh->sh))
499  "UNLISTEN X",
500  eh);
501  if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
502  {
504  "Stopping PQ event scheduler job\n");
505  GNUNET_free (db->rfd);
506  if (NULL != db->event_task)
507  {
508  GNUNET_SCHEDULER_cancel (db->event_task);
509  db->event_task = NULL;
510  }
511  }
512  if (NULL != eh->timeout_task)
513  {
515  eh->timeout_task = NULL;
516  }
517  GNUNET_free (eh);
518 }
519 
520 
521 char *
523 {
524  char sql[16 + 64 + 8];
525  char *end;
526 
527  end = stpcpy (sql,
528  "X");
529  end = es_to_channel (es,
530  end);
531  GNUNET_assert (NULL != end);
532  return GNUNET_strdup (sql);
533 }
534 
535 
536 void
538  const struct GNUNET_DB_EventHeaderP *es,
539  const void *extra,
540  size_t extra_size)
541 {
542  char sql[16 + 64 + extra_size * 8 / 5 + 8];
543  char *end;
544  PGresult *result;
545 
546  end = stpcpy (sql,
547  "NOTIFY X");
548  end = es_to_channel (es,
549  end);
550  end = stpcpy (end,
551  ", '");
553  extra_size,
554  end,
555  sizeof (sql) - (end - sql) - 1);
556  GNUNET_assert (NULL != end);
557  *end = '\0';
558  end = stpcpy (end,
559  "'");
561  "Executing command `%s'\n",
562  sql);
563  result = PQexec (db->conn,
564  sql);
565  if (PGRES_COMMAND_OK != PQresultStatus (result))
566  {
568  "pq",
569  "Failed to execute `%s': %s/%s/%s/%s/%s",
570  sql,
571  PQresultErrorField (result,
572  PG_DIAG_MESSAGE_PRIMARY),
573  PQresultErrorField (result,
574  PG_DIAG_MESSAGE_DETAIL),
575  PQresultErrorMessage (result),
576  PQresStatus (PQresultStatus (result)),
577  PQerrorMessage (db->conn));
578  }
579  PQclear (result);
580  event_do_poll (db);
581 }
582 
583 
584 /* end of pq_event.c */
static struct GNUNET_TIME_Relative timeout
Desired timeout for the lookup (default is no timeout).
Definition: gnunet-abd.c:61
static int end
Set if we are to shutdown all services (including ARM).
Definition: gnunet-arm.c:34
static struct SolverHandle * sh
static char * value
Value of the record to add/remove.
static int result
Global testing status.
static struct GNUNET_FS_DirectoryBuilder * db
Definition: gnunet-search.c:97
static struct GNUNET_DNSSTUB_Context * ctx
Context for DNS resolution.
void(* GNUNET_DB_EventCallback)(void *cls, const void *extra, size_t extra_size)
Function called on events received from Postgres.
Definition: gnunet_db_lib.h:80
void GNUNET_PQ_reconnect(struct GNUNET_PQ_Context *db)
Reinitialize the database db.
Definition: pq_connect.c:325
void GNUNET_CRYPTO_hash(const void *block, size_t size, struct GNUNET_HashCode *ret)
Compute hash of a given block.
Definition: crypto_hash.c:41
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multishortmap_put(struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
int GNUNET_CONTAINER_multishortmap_iterate(struct GNUNET_CONTAINER_MultiShortmap *map, GNUNET_CONTAINER_ShortmapIterator it, void *it_cls)
Iterate over all entries in the map.
int GNUNET_CONTAINER_multishortmap_get_multiple(struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key, GNUNET_CONTAINER_ShortmapIterator it, void *it_cls)
Iterate over all entries in the map that match a particular key.
void * GNUNET_CONTAINER_multishortmap_get(const struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key)
Given a key find a value in the map matching the key.
unsigned int GNUNET_CONTAINER_multishortmap_size(const struct GNUNET_CONTAINER_MultiShortmap *map)
Get the number of key-value pairs in the map.
int GNUNET_CONTAINER_multishortmap_remove(struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key, const void *value)
Remove the given key-value pair from the map.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE
Allow multiple values with the same key.
#define GNUNET_log(kind,...)
#define GNUNET_log_from(kind, comp,...)
GNUNET_GenericReturnValue
Named constants for return values.
#define GNUNET_static_assert(cond)
Assertion to be checked (if supported by C compiler) at compile time, otherwise checked at runtime an...
@ GNUNET_OK
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_INFO
#define GNUNET_strdup(a)
Wrapper around GNUNET_xstrdup_.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_free(ptr)
Wrapper around free.
struct GNUNET_NETWORK_Handle * GNUNET_NETWORK_socket_box_native(int fd)
Box a native socket (and check that it is a socket).
Definition: network.c:579
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_read_net(struct GNUNET_TIME_Relative delay, struct GNUNET_NETWORK_Handle *rfd, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay or when the specified file descriptor is ready f...
Definition: scheduler.c:1475
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:944
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1241
enum GNUNET_GenericReturnValue GNUNET_STRINGS_string_to_data_alloc(const char *enc, size_t enclen, void **out, size_t *out_size)
Convert CrockfordBase32 encoding back to data.
Definition: strings.c:854
char * GNUNET_STRINGS_data_to_string(const void *data, size_t size, char *out, size_t out_size)
Convert binary data to ASCII encoding using CrockfordBase32.
Definition: strings.c:708
enum GNUNET_GenericReturnValue GNUNET_STRINGS_string_to_data(const char *enc, size_t enclen, void *out, size_t out_size)
Convert CrockfordBase32 encoding back to data.
Definition: strings.c:788
#define GNUNET_TIME_UNIT_FOREVER_REL
Constant used to specify "forever".
struct GNUNET_TIME_Relative GNUNET_TIME_relative_max(struct GNUNET_TIME_Relative t1, struct GNUNET_TIME_Relative t2)
Return the maximum of two relative time values.
Definition: time.c:351
#define GNUNET_TIME_UNIT_SECONDS
One second.
#define GNUNET_TIME_UNIT_ZERO
Relative time zero.
#define GNUNET_TIME_STD_BACKOFF(r)
Perform our standard exponential back-off calculation, starting at 1 ms and then going by a factor of...
shared internal data structures of libgnunetpq
static enum GNUNET_GenericReturnValue channel_to_sh(const char *identifier, struct GNUNET_ShortHashCode *sh)
Convert sh to a Postgres identifier.
Definition: pq_event.c:116
static void es_to_sh(const struct GNUNET_DB_EventHeaderP *es, struct GNUNET_ShortHashCode *sh)
Convert es to a short hash.
Definition: pq_event.c:69
void GNUNET_PQ_event_notify(struct GNUNET_PQ_Context *db, const struct GNUNET_DB_EventHeaderP *es, const void *extra, size_t extra_size)
Notify all that listen on es of an event.
Definition: pq_event.c:537
struct GNUNET_DB_EventHandler * GNUNET_PQ_event_listen(struct GNUNET_PQ_Context *db, const struct GNUNET_DB_EventHeaderP *es, struct GNUNET_TIME_Relative timeout, GNUNET_DB_EventCallback cb, void *cb_cls)
Register callback to be invoked on events of type es.
Definition: pq_event.c:445
static void event_timeout(void *cls)
Function run on timeout for an event.
Definition: pq_event.c:433
static enum GNUNET_GenericReturnValue register_notify(void *cls, const struct GNUNET_ShortHashCode *sh, void *value)
Re-subscribe to notifications after disconnect.
Definition: pq_event.c:397
static char * sh_to_channel(struct GNUNET_ShortHashCode *sh, char identifier[64])
Convert sh to a Postgres identifier.
Definition: pq_event.c:93
static char * es_to_channel(const struct GNUNET_DB_EventHeaderP *es, char identifier[64])
Convert es to a Postgres identifier.
Definition: pq_event.c:135
static void event_do_poll(struct GNUNET_PQ_Context *db)
Definition: pq_event.c:189
void GNUNET_PQ_event_reconnect_(struct GNUNET_PQ_Context *db, int fd)
Internal API.
Definition: pq_event.c:412
static void manage_subscribe(struct GNUNET_PQ_Context *db, const char *cmd, struct GNUNET_DB_EventHandler *eh)
Helper function to trigger an SQL cmd on db.
Definition: pq_event.c:351
static int do_notify(void *cls, const struct GNUNET_ShortHashCode *sh, void *value)
Function called on every event handler that needs to be triggered.
Definition: pq_event.c:174
static void scheduler_fd_cb(void *cls, int fd)
Function called when the Postgres FD changes and we need to update the scheduler event loop task.
Definition: pq_event.c:313
static void do_scheduler_notify(void *cls)
The GNUnet scheduler notifies us that we need to trigger the DB event poller.
Definition: pq_event.c:272
void GNUNET_PQ_event_listen_cancel(struct GNUNET_DB_EventHandler *eh)
Stop notifications.
Definition: pq_event.c:487
char * GNUNET_PG_get_event_notify_channel(const struct GNUNET_DB_EventHeaderP *es)
Compute the channel that one should notify upon for the given event specification.
Definition: pq_event.c:522
Handle for an active LISTENer to the database.
Definition: pq_event.c:34
void * cb_cls
Closure for cb.
Definition: pq_event.c:48
struct GNUNET_SCHEDULER_Task * timeout_task
Task to run on timeout.
Definition: pq_event.c:58
struct GNUNET_ShortHashCode sh
Channel name.
Definition: pq_event.c:38
struct GNUNET_PQ_Context * db
Database context this event handler is with.
Definition: pq_event.c:53
GNUNET_DB_EventCallback cb
Function to call on events.
Definition: pq_event.c:43
Header of a structure that describes an event channel we may subscribe to or notify on.
Definition: gnunet_db_lib.h:92
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
Definition: gnunet_db_lib.h:97
A 512-bit hashcode.
Handle to Postgres database.
Definition: pq.h:36
struct GNUNET_PQ_ExecuteStatement * es
Statements to execute upon connection.
Definition: pq.h:45
Entry in list of pending tasks.
Definition: scheduler.c:136
A 256-bit hashcode.
Time for relative time used by GNUnet, in microseconds.
Closure for do_notify().
Definition: pq_event.c:151
void * extra
Extra argument of the notification, or NULL.
Definition: pq_event.c:155
size_t extra_size
Number of bytes in extra.
Definition: pq_event.c:160