GNUnet  0.11.x
pq_event.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2021 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
25 #include "platform.h"
26 #include "pq.h"
27 #include <pthread.h>
28 
29 
34 {
38  struct GNUNET_ShortHashCode sh;
39 
44 
48  void *cb_cls;
49 
54 
59 };
60 
61 
68 static void
69 es_to_sh (const struct GNUNET_DB_EventHeaderP *es,
70  struct GNUNET_ShortHashCode *sh)
71 {
72  struct GNUNET_HashCode h_channel;
73 
75  ntohs (es->size),
76  &h_channel);
77  GNUNET_static_assert (sizeof (*sh) <= sizeof (h_channel));
78  memcpy (sh,
79  &h_channel,
80  sizeof (*sh));
81 }
82 
83 
92 static char *
94  char identifier[64])
95 {
96  char *end;
97 
99  sizeof (*sh),
100  identifier,
101  63);
102  GNUNET_assert (NULL != end);
103  *end = '\0';
104  return end;
105 }
106 
107 
115 static enum GNUNET_GenericReturnValue
116 channel_to_sh (const char *identifier,
117  struct GNUNET_ShortHashCode *sh)
118 {
119  return GNUNET_STRINGS_string_to_data (identifier,
120  strlen (identifier),
121  sh,
122  sizeof (*sh));
123 }
124 
125 
134 static char *
136  char identifier[64])
137 {
138  struct GNUNET_ShortHashCode sh;
139 
140  es_to_sh (es,
141  &sh);
142  return sh_to_channel (&sh,
143  identifier);
144 }
145 
146 
151 {
155  void *extra;
156 
160  size_t extra_size;
161 };
162 
163 
173 static int
174 do_notify (void *cls,
175  const struct GNUNET_ShortHashCode *sh,
176  void *value)
177 {
178  struct NotifyContext *ctx = cls;
179  struct GNUNET_DB_EventHandler *eh = value;
180 
181  eh->cb (eh->cb_cls,
182  ctx->extra,
183  ctx->extra_size);
184  return GNUNET_OK;
185 }
186 
187 
188 static void
190 {
191  PGnotify *n;
192 
194  "PG poll job active\n");
195  if (1 !=
196  PQconsumeInput (db->conn))
197  {
199  "Failed to read from Postgres: %s\n",
200  PQerrorMessage (db->conn));
201  if (CONNECTION_BAD != PQstatus (db->conn))
202  return;
204  return;
205  }
206  while (NULL != (n = PQnotifies (db->conn)))
207  {
208  struct GNUNET_ShortHashCode sh;
209  struct NotifyContext ctx = {
210  .extra = NULL
211  };
212 
213  if ('X' != toupper ((int) n->relname[0]))
214  {
216  "Ignoring notification for unsupported channel identifier `%s'\n",
217  n->relname);
218  PQfreemem (n);
219  continue;
220  }
221  if (GNUNET_OK !=
222  channel_to_sh (&n->relname[1],
223  &sh))
224  {
226  "Ignoring notification for unsupported channel identifier `%s'\n",
227  n->relname);
228  PQfreemem (n);
229  continue;
230  }
231  if ( (NULL != n->extra) &&
232  (GNUNET_OK !=
234  strlen (n->extra),
235  &ctx.extra,
236  &ctx.extra_size)))
237  {
239  "Ignoring notification for unsupported extra data `%s' on channel `%s'\n",
240  n->extra,
241  n->relname);
242  PQfreemem (n);
243  continue;
244  }
246  "Received notification %s with extra data `%.*s'\n",
247  n->relname,
248  (int) ctx.extra_size,
249  (const char *) ctx.extra);
251  &sh,
252  &do_notify,
253  &ctx);
254  GNUNET_free (ctx.extra);
255  PQfreemem (n);
256  }
257 }
258 
259 
266 static void
268 {
269  struct GNUNET_PQ_Context *db = cls;
270 
271  db->event_task = NULL;
272  if (NULL == db->rfd)
274  event_do_poll (db);
275  if (NULL != db->event_task)
276  return;
278  "Resubscribing\n");
279  if (NULL == db->rfd)
280  {
283  db);
284  return;
285  }
286  db->event_task
288  db->rfd,
290  db);
291 }
292 
293 
301 static void
302 scheduler_fd_cb (void *cls,
303  int fd)
304 {
305  struct GNUNET_PQ_Context *db = cls;
306 
308  "New poll FD is %d\n",
309  fd);
310  if (NULL != db->event_task)
311  {
312  GNUNET_SCHEDULER_cancel (db->event_task);
313  db->event_task = NULL;
314  }
315  GNUNET_free (db->rfd);
316  if (-1 == fd)
317  return;
318  if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
319  return;
321  "Activating poll job on %d\n",
322  fd);
324  db->event_task
326  db->rfd,
328  db);
329 }
330 
331 
339 static void
341  const char *cmd,
342  struct GNUNET_DB_EventHandler *eh)
343 {
344  char sql[16 + 64];
345  char *end;
346  PGresult *result;
347 
348  if (NULL == db->conn)
349  return;
350  end = stpcpy (sql,
351  cmd);
352  end = sh_to_channel (&eh->sh,
353  end);
355  "Executing PQ command `%s'\n",
356  sql);
357  result = PQexec (db->conn,
358  sql);
359  if (PGRES_COMMAND_OK != PQresultStatus (result))
360  {
362  "pq",
363  "Failed to execute `%s': %s/%s/%s/%s/%s",
364  sql,
365  PQresultErrorField (result,
366  PG_DIAG_MESSAGE_PRIMARY),
367  PQresultErrorField (result,
368  PG_DIAG_MESSAGE_DETAIL),
369  PQresultErrorMessage (result),
370  PQresStatus (PQresultStatus (result)),
371  PQerrorMessage (db->conn));
372  }
373  PQclear (result);
374 }
375 
376 
385 static int
386 register_notify (void *cls,
387  const struct GNUNET_ShortHashCode *sh,
388  void *value)
389 {
390  struct GNUNET_PQ_Context *db = cls;
391  struct GNUNET_DB_EventHandler *eh = value;
392 
394  "LISTEN X",
395  eh);
396  return GNUNET_OK;
397 }
398 
399 
400 void
402  int fd)
403 {
405  "Change in PQ event FD to %d\n",
406  fd);
408  fd);
411  db);
412 }
413 
414 
421 static void
422 event_timeout (void *cls)
423 {
424  struct GNUNET_DB_EventHandler *eh = cls;
425 
426  eh->timeout_task = NULL;
427  eh->cb (eh->cb_cls,
428  NULL,
429  0);
430 }
431 
432 
433 struct GNUNET_DB_EventHandler *
435  const struct GNUNET_DB_EventHeaderP *es,
438  void *cb_cls)
439 {
440  struct GNUNET_DB_EventHandler *eh;
441  bool sub;
442 
443  eh = GNUNET_new (struct GNUNET_DB_EventHandler);
444  eh->db = db;
445  es_to_sh (es,
446  &eh->sh);
447  eh->cb = cb;
448  eh->cb_cls = cb_cls;
449  sub = (NULL ==
451  &eh->sh));
454  &eh->sh,
455  eh,
457  if (NULL == db->event_task)
458  {
460  "Starting event scheduler\n");
462  PQsocket (db->conn));
463  }
464  if (sub)
466  "LISTEN X",
467  eh);
469  &event_timeout,
470  eh);
471  return eh;
472 }
473 
474 
475 void
477 {
478  struct GNUNET_PQ_Context *db = eh->db;
479 
482  &eh->sh,
483  eh));
484  if (NULL ==
486  &eh->sh))
488  "UNLISTEN X",
489  eh);
490  if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
491  {
493  "Stopping PQ event scheduler job\n");
494  GNUNET_free (db->rfd);
495  if (NULL != db->event_task)
496  {
497  GNUNET_SCHEDULER_cancel (db->event_task);
498  db->event_task = NULL;
499  }
500  }
501  if (NULL != eh->timeout_task)
502  {
504  eh->timeout_task = NULL;
505  }
506  GNUNET_free (eh);
507 }
508 
509 
510 void
512  const struct GNUNET_DB_EventHeaderP *es,
513  const void *extra,
514  size_t extra_size)
515 {
516  char sql[16 + 64 + extra_size * 8 / 5 + 8];
517  char *end;
518  PGresult *result;
519 
520  end = stpcpy (sql,
521  "NOTIFY X");
522  end = es_to_channel (es,
523  end);
524  end = stpcpy (end,
525  ", '");
527  extra_size,
528  end,
529  sizeof (sql) - (end - sql) - 1);
530  GNUNET_assert (NULL != end);
531  *end = '\0';
532  end = stpcpy (end,
533  "'");
535  "Executing command `%s'\n",
536  sql);
537  result = PQexec (db->conn,
538  sql);
539  if (PGRES_COMMAND_OK != PQresultStatus (result))
540  {
542  "pq",
543  "Failed to execute `%s': %s/%s/%s/%s/%s",
544  sql,
545  PQresultErrorField (result,
546  PG_DIAG_MESSAGE_PRIMARY),
547  PQresultErrorField (result,
548  PG_DIAG_MESSAGE_DETAIL),
549  PQresultErrorMessage (result),
550  PQresStatus (PQresultStatus (result)),
551  PQerrorMessage (db->conn));
552  }
553  PQclear (result);
554  event_do_poll (db);
555 }
556 
557 
558 /* end of pq_event.c */
static struct GNUNET_TIME_Relative timeout
Desired timeout for the lookup (default is no timeout).
Definition: gnunet-abd.c:61
static int end
Set if we are to shutdown all services (including ARM).
Definition: gnunet-arm.c:34
static struct SolverHandle * sh
static char * value
Value of the record to add/remove.
static int result
Global testing status.
static struct GNUNET_FS_DirectoryBuilder * db
Definition: gnunet-search.c:41
static struct GNUNET_DNSSTUB_Context * ctx
Context for DNS resolution.
#define GNUNET_log(kind,...)
#define GNUNET_log_from(kind, comp,...)
GNUNET_GenericReturnValue
Named constants for return values.
Definition: gnunet_common.h:92
@ GNUNET_OK
Definition: gnunet_common.h:95
#define GNUNET_static_assert(cond)
Assertion to be checked (if supported by C compiler) at compile time, otherwise checked at runtime an...
void(* GNUNET_DB_EventCallback)(void *cls, const void *extra, size_t extra_size)
Function called on events received from Postgres.
Definition: gnunet_db_lib.h:79
void GNUNET_PQ_reconnect(struct GNUNET_PQ_Context *db)
Reinitialize the database db.
Definition: pq_connect.c:313
void GNUNET_CRYPTO_hash(const void *block, size_t size, struct GNUNET_HashCode *ret)
Compute hash of a given block.
Definition: crypto_hash.c:41
int GNUNET_CONTAINER_multishortmap_iterate(struct GNUNET_CONTAINER_MultiShortmap *map, GNUNET_CONTAINER_ShortmapIterator it, void *it_cls)
Iterate over all entries in the map.
int GNUNET_CONTAINER_multishortmap_put(struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
int GNUNET_CONTAINER_multishortmap_get_multiple(struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key, GNUNET_CONTAINER_ShortmapIterator it, void *it_cls)
Iterate over all entries in the map that match a particular key.
void * GNUNET_CONTAINER_multishortmap_get(const struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key)
Given a key find a value in the map matching the key.
unsigned int GNUNET_CONTAINER_multishortmap_size(const struct GNUNET_CONTAINER_MultiShortmap *map)
Get the number of key-value pairs in the map.
int GNUNET_CONTAINER_multishortmap_remove(struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key, const void *value)
Remove the given key-value pair from the map.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE
Allow multiple values with the same key.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_INFO
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_free(ptr)
Wrapper around free.
struct GNUNET_NETWORK_Handle * GNUNET_NETWORK_socket_box_native(int fd)
Box a native socket (and check that it is a socket).
Definition: network.c:583
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_read_net(struct GNUNET_TIME_Relative delay, struct GNUNET_NETWORK_Handle *rfd, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay or when the specified file descriptor is ready f...
Definition: scheduler.c:1517
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:972
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1269
enum GNUNET_GenericReturnValue GNUNET_STRINGS_string_to_data_alloc(const char *enc, size_t enclen, void **out, size_t *out_size)
Convert CrockfordBase32 encoding back to data.
Definition: strings.c:828
char * GNUNET_STRINGS_data_to_string(const void *data, size_t size, char *out, size_t out_size)
Convert binary data to ASCII encoding using CrockfordBase32.
Definition: strings.c:682
enum GNUNET_GenericReturnValue GNUNET_STRINGS_string_to_data(const char *enc, size_t enclen, void *out, size_t out_size)
Convert CrockfordBase32 encoding back to data.
Definition: strings.c:762
#define GNUNET_TIME_UNIT_FOREVER_REL
Constant used to specify "forever".
#define GNUNET_TIME_UNIT_SECONDS
One second.
#define GNUNET_TIME_UNIT_ZERO
Relative time zero.
shared internal data structures of libgnunetpq
static int register_notify(void *cls, const struct GNUNET_ShortHashCode *sh, void *value)
Re-subscribe to notifications after disconnect.
Definition: pq_event.c:386
static enum GNUNET_GenericReturnValue channel_to_sh(const char *identifier, struct GNUNET_ShortHashCode *sh)
Convert sh to a Postgres identifier.
Definition: pq_event.c:116
static void es_to_sh(const struct GNUNET_DB_EventHeaderP *es, struct GNUNET_ShortHashCode *sh)
Convert es to a short hash.
Definition: pq_event.c:69
void GNUNET_PQ_event_notify(struct GNUNET_PQ_Context *db, const struct GNUNET_DB_EventHeaderP *es, const void *extra, size_t extra_size)
Notify all that listen on es of an event.
Definition: pq_event.c:511
struct GNUNET_DB_EventHandler * GNUNET_PQ_event_listen(struct GNUNET_PQ_Context *db, const struct GNUNET_DB_EventHeaderP *es, struct GNUNET_TIME_Relative timeout, GNUNET_DB_EventCallback cb, void *cb_cls)
Register callback to be invoked on events of type es.
Definition: pq_event.c:434
static void event_timeout(void *cls)
Function run on timeout for an event.
Definition: pq_event.c:422
static char * sh_to_channel(struct GNUNET_ShortHashCode *sh, char identifier[64])
Convert sh to a Postgres identifier.
Definition: pq_event.c:93
static char * es_to_channel(const struct GNUNET_DB_EventHeaderP *es, char identifier[64])
Convert es to a Postgres identifier.
Definition: pq_event.c:135
static void event_do_poll(struct GNUNET_PQ_Context *db)
Definition: pq_event.c:189
void GNUNET_PQ_event_reconnect_(struct GNUNET_PQ_Context *db, int fd)
Internal API.
Definition: pq_event.c:401
static void manage_subscribe(struct GNUNET_PQ_Context *db, const char *cmd, struct GNUNET_DB_EventHandler *eh)
Helper function to trigger an SQL cmd on db.
Definition: pq_event.c:340
static int do_notify(void *cls, const struct GNUNET_ShortHashCode *sh, void *value)
Function called on every event handler that needs to be triggered.
Definition: pq_event.c:174
static void scheduler_fd_cb(void *cls, int fd)
Function called when the Postgres FD changes and we need to update the scheduler event loop task.
Definition: pq_event.c:302
static void do_scheduler_notify(void *cls)
The GNUnet scheduler notifies us that we need to trigger the DB event poller.
Definition: pq_event.c:267
void GNUNET_PQ_event_listen_cancel(struct GNUNET_DB_EventHandler *eh)
Stop notifications.
Definition: pq_event.c:476
Handle for an active LISTENer to the database.
Definition: pq_event.c:34
void * cb_cls
Closure for cb.
Definition: pq_event.c:48
struct GNUNET_SCHEDULER_Task * timeout_task
Task to run on timeout.
Definition: pq_event.c:58
struct GNUNET_ShortHashCode sh
Channel name.
Definition: pq_event.c:38
struct GNUNET_PQ_Context * db
Database context this event handler is with.
Definition: pq_event.c:53
GNUNET_DB_EventCallback cb
Function to call on events.
Definition: pq_event.c:43
Header of a structure that describes an event channel we may subscribe to or notify on.
Definition: gnunet_db_lib.h:91
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
Definition: gnunet_db_lib.h:96
A 512-bit hashcode.
Handle to Postgres database.
Definition: pq.h:36
struct GNUNET_PQ_ExecuteStatement * es
Statements to execute upon connection.
Definition: pq.h:45
Entry in list of pending tasks.
Definition: scheduler.c:135
A 256-bit hashcode.
Time for relative time used by GNUnet, in microseconds.
Closure for do_notify().
Definition: pq_event.c:151
void * extra
Extra argument of the notification, or NULL.
Definition: pq_event.c:155
size_t extra_size
Number of bytes in extra.
Definition: pq_event.c:160