GNUnet  0.11.x
pq_event.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2021 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
25 #include "platform.h"
26 #include "pq.h"
27 #include <pthread.h>
28 
29 
34 {
38  struct GNUNET_ShortHashCode sh;
39 
44 
48  void *cb_cls;
49 
54 
59 };
60 
61 
68 static void
69 es_to_sh (const struct GNUNET_DB_EventHeaderP *es,
70  struct GNUNET_ShortHashCode *sh)
71 {
72  struct GNUNET_HashCode h_channel;
73 
75  ntohs (es->size),
76  &h_channel);
77  GNUNET_static_assert (sizeof (*sh) <= sizeof (h_channel));
78  memcpy (sh,
79  &h_channel,
80  sizeof (*sh));
81 }
82 
83 
92 static char *
94  char identifier[64])
95 {
96  char *end;
97 
99  sizeof (*sh),
100  identifier,
101  63);
102  GNUNET_assert (NULL != end);
103  *end = '\0';
104  return end;
105 }
106 
107 
115 static enum GNUNET_GenericReturnValue
116 channel_to_sh (const char *identifier,
117  struct GNUNET_ShortHashCode *sh)
118 {
119  return GNUNET_STRINGS_string_to_data (identifier,
120  strlen (identifier),
121  sh,
122  sizeof (*sh));
123 }
124 
125 
134 static char *
136  char identifier[64])
137 {
138  struct GNUNET_ShortHashCode sh;
139 
140  es_to_sh (es,
141  &sh);
142  return sh_to_channel (&sh,
143  identifier);
144 }
145 
146 
151 {
155  void *extra;
156 
160  size_t extra_size;
161 };
162 
163 
173 static int
174 do_notify (void *cls,
175  const struct GNUNET_ShortHashCode *sh,
176  void *value)
177 {
178  struct NotifyContext *ctx = cls;
179  struct GNUNET_DB_EventHandler *eh = value;
180 
181  eh->cb (eh->cb_cls,
182  ctx->extra,
183  ctx->extra_size);
184  return GNUNET_OK;
185 }
186 
187 
188 static void
190 {
191  PGnotify *n;
192 
194  "PG poll job active\n");
195  if (1 !=
196  PQconsumeInput (db->conn))
198  "Failed to read from Postgres: %s\n",
199  PQerrorMessage (db->conn));
200  while (NULL != (n = PQnotifies (db->conn)))
201  {
202  struct GNUNET_ShortHashCode sh;
203  struct NotifyContext ctx = {
204  .extra = NULL
205  };
206 
207  if ('X' != toupper ((int) n->relname[0]))
208  {
210  "Ignoring notification for unsupported channel identifier `%s'\n",
211  n->relname);
212  PQfreemem (n);
213  continue;
214  }
215  if (GNUNET_OK !=
216  channel_to_sh (&n->relname[1],
217  &sh))
218  {
220  "Ignoring notification for unsupported channel identifier `%s'\n",
221  n->relname);
222  PQfreemem (n);
223  continue;
224  }
225  if ( (NULL != n->extra) &&
226  (GNUNET_OK !=
228  strlen (n->extra),
229  &ctx.extra,
230  &ctx.extra_size)))
231  {
233  "Ignoring notification for unsupported extra data `%s' on channel `%s'\n",
234  n->extra,
235  n->relname);
236  PQfreemem (n);
237  continue;
238  }
240  "Received notification %s with extra data `%.*s'\n",
241  n->relname,
242  (int) ctx.extra_size,
243  (const char *) ctx.extra);
245  &sh,
246  &do_notify,
247  &ctx);
248  GNUNET_free (ctx.extra);
249  PQfreemem (n);
250  }
251 }
252 
253 
260 static void
262 {
263  struct GNUNET_PQ_Context *db = cls;
264 
265  db->event_task = NULL;
266  GNUNET_assert (NULL != db->rfd);
267  event_do_poll (db);
269  "Resubscribing\n");
270  db->event_task
272  db->rfd,
274  db);
275 }
276 
277 
285 static void
286 scheduler_fd_cb (void *cls,
287  int fd)
288 {
289  struct GNUNET_PQ_Context *db = cls;
290 
292  "New poll FD is %d\n",
293  fd);
294  if (NULL != db->event_task)
295  {
296  GNUNET_SCHEDULER_cancel (db->event_task);
297  db->event_task = NULL;
298  }
299  GNUNET_free (db->rfd);
300  if (-1 == fd)
301  return;
302  if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
303  return;
305  "Activating poll job on %d\n",
306  fd);
308  db->event_task
310  db->rfd,
312  db);
313 }
314 
315 
323 static void
325  const char *cmd,
326  struct GNUNET_DB_EventHandler *eh)
327 {
328  char sql[16 + 64];
329  char *end;
330  PGresult *result;
331 
332  end = stpcpy (sql,
333  cmd);
334  end = sh_to_channel (&eh->sh,
335  end);
337  "Executing PQ command `%s'\n",
338  sql);
339  result = PQexec (db->conn,
340  sql);
341  if (PGRES_COMMAND_OK != PQresultStatus (result))
342  {
344  "pq",
345  "Failed to execute `%s': %s/%s/%s/%s/%s",
346  sql,
347  PQresultErrorField (result,
348  PG_DIAG_MESSAGE_PRIMARY),
349  PQresultErrorField (result,
350  PG_DIAG_MESSAGE_DETAIL),
351  PQresultErrorMessage (result),
352  PQresStatus (PQresultStatus (result)),
353  PQerrorMessage (db->conn));
354  }
355  PQclear (result);
356 }
357 
358 
367 static int
368 register_notify (void *cls,
369  const struct GNUNET_ShortHashCode *sh,
370  void *value)
371 {
372  struct GNUNET_PQ_Context *db = cls;
373  struct GNUNET_DB_EventHandler *eh = value;
374 
376  "LISTEN X",
377  eh);
378  return GNUNET_OK;
379 }
380 
381 
382 void
384  int fd)
385 {
387  "Change in PQ event FD to %d\n",
388  fd);
390  fd);
393  db);
394 }
395 
396 
403 static void
404 event_timeout (void *cls)
405 {
406  struct GNUNET_DB_EventHandler *eh = cls;
407 
408  eh->timeout_task = NULL;
409  eh->cb (eh->cb_cls,
410  NULL,
411  0);
412 }
413 
414 
415 struct GNUNET_DB_EventHandler *
417  const struct GNUNET_DB_EventHeaderP *es,
420  void *cb_cls)
421 {
422  struct GNUNET_DB_EventHandler *eh;
423 
424  eh = GNUNET_new (struct GNUNET_DB_EventHandler);
425  eh->db = db;
426  es_to_sh (es,
427  &eh->sh);
428  eh->cb = cb;
429  eh->cb_cls = cb_cls;
432  &eh->sh,
433  eh,
435  if (NULL == db->event_task)
436  {
438  "Starting event scheduler\n");
440  PQsocket (db->conn));
441  }
443  "LISTEN X",
444  eh);
446  &event_timeout,
447  eh);
448  return eh;
449 }
450 
451 
452 void
454 {
455  struct GNUNET_PQ_Context *db = eh->db;
456 
459  &eh->sh,
460  eh));
462  "UNLISTEN X",
463  eh);
464  if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
465  {
467  "Stopping PQ event scheduler job\n");
468  GNUNET_free (db->rfd);
469  if (NULL != db->event_task)
470  {
471  GNUNET_SCHEDULER_cancel (db->event_task);
472  db->event_task = NULL;
473  }
474  }
475  if (NULL != eh->timeout_task)
476  {
478  eh->timeout_task = NULL;
479  }
480  GNUNET_free (eh);
481 }
482 
483 
484 void
486  const struct GNUNET_DB_EventHeaderP *es,
487  const void *extra,
488  size_t extra_size)
489 {
490  char sql[16 + 64 + extra_size * 8 / 5 + 8];
491  char *end;
492  PGresult *result;
493 
494  end = stpcpy (sql,
495  "NOTIFY X");
496  end = es_to_channel (es,
497  end);
498  end = stpcpy (end,
499  ", '");
501  extra_size,
502  end,
503  sizeof (sql) - (end - sql) - 1);
504  GNUNET_assert (NULL != end);
505  *end = '\0';
506  end = stpcpy (end,
507  "'");
509  "Executing command `%s'\n",
510  sql);
511  result = PQexec (db->conn,
512  sql);
513  if (PGRES_COMMAND_OK != PQresultStatus (result))
514  {
516  "pq",
517  "Failed to execute `%s': %s/%s/%s/%s/%s",
518  sql,
519  PQresultErrorField (result,
520  PG_DIAG_MESSAGE_PRIMARY),
521  PQresultErrorField (result,
522  PG_DIAG_MESSAGE_DETAIL),
523  PQresultErrorMessage (result),
524  PQresStatus (PQresultStatus (result)),
525  PQerrorMessage (db->conn));
526  }
527  PQclear (result);
528  event_do_poll (db);
529 }
530 
531 
532 /* end of pq_event.c */
static struct GNUNET_TIME_Relative timeout
Desired timeout for the lookup (default is no timeout).
Definition: gnunet-abd.c:61
static int end
Set if we are to shutdown all services (including ARM).
Definition: gnunet-arm.c:34
static struct SolverHandle * sh
static char * value
Value of the record to add/remove.
static int result
Global testing status.
static struct GNUNET_FS_DirectoryBuilder * db
Definition: gnunet-search.c:41
static struct GNUNET_DNSSTUB_Context * ctx
Context for DNS resolution.
#define GNUNET_log(kind,...)
#define GNUNET_log_from(kind, comp,...)
GNUNET_GenericReturnValue
Named constants for return values.
Definition: gnunet_common.h:92
@ GNUNET_OK
Definition: gnunet_common.h:95
#define GNUNET_static_assert(cond)
Assertion to be checked (if supported by C compiler) at compile time, otherwise checked at runtime an...
void(* GNUNET_DB_EventCallback)(void *cls, const void *extra, size_t extra_size)
Function called on events received from Postgres.
Definition: gnunet_db_lib.h:79
void GNUNET_CRYPTO_hash(const void *block, size_t size, struct GNUNET_HashCode *ret)
Compute hash of a given block.
Definition: crypto_hash.c:41
int GNUNET_CONTAINER_multishortmap_iterate(struct GNUNET_CONTAINER_MultiShortmap *map, GNUNET_CONTAINER_ShortmapIterator it, void *it_cls)
Iterate over all entries in the map.
int GNUNET_CONTAINER_multishortmap_put(struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
int GNUNET_CONTAINER_multishortmap_get_multiple(struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key, GNUNET_CONTAINER_ShortmapIterator it, void *it_cls)
Iterate over all entries in the map that match a particular key.
unsigned int GNUNET_CONTAINER_multishortmap_size(const struct GNUNET_CONTAINER_MultiShortmap *map)
Get the number of key-value pairs in the map.
int GNUNET_CONTAINER_multishortmap_remove(struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key, const void *value)
Remove the given key-value pair from the map.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE
Allow multiple values with the same key.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_INFO
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_free(ptr)
Wrapper around free.
struct GNUNET_NETWORK_Handle * GNUNET_NETWORK_socket_box_native(int fd)
Box a native socket (and check that it is a socket).
Definition: network.c:636
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_read_net(struct GNUNET_TIME_Relative delay, struct GNUNET_NETWORK_Handle *rfd, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay or when the specified file descriptor is ready f...
Definition: scheduler.c:1517
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:972
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1269
enum GNUNET_GenericReturnValue GNUNET_STRINGS_string_to_data_alloc(const char *enc, size_t enclen, void **out, size_t *out_size)
Convert CrockfordBase32 encoding back to data.
Definition: strings.c:841
char * GNUNET_STRINGS_data_to_string(const void *data, size_t size, char *out, size_t out_size)
Convert binary data to ASCII encoding using CrockfordBase32.
Definition: strings.c:695
enum GNUNET_GenericReturnValue GNUNET_STRINGS_string_to_data(const char *enc, size_t enclen, void *out, size_t out_size)
Convert CrockfordBase32 encoding back to data.
Definition: strings.c:775
#define GNUNET_TIME_UNIT_FOREVER_REL
Constant used to specify "forever".
#define GNUNET_TIME_UNIT_ZERO
Relative time zero.
shared internal data structures of libgnunetpq
static int register_notify(void *cls, const struct GNUNET_ShortHashCode *sh, void *value)
Re-subscribe to notifications after disconnect.
Definition: pq_event.c:368
static enum GNUNET_GenericReturnValue channel_to_sh(const char *identifier, struct GNUNET_ShortHashCode *sh)
Convert sh to a Postgres identifier.
Definition: pq_event.c:116
static void es_to_sh(const struct GNUNET_DB_EventHeaderP *es, struct GNUNET_ShortHashCode *sh)
Convert es to a short hash.
Definition: pq_event.c:69
void GNUNET_PQ_event_notify(struct GNUNET_PQ_Context *db, const struct GNUNET_DB_EventHeaderP *es, const void *extra, size_t extra_size)
Notify all that listen on es of an event.
Definition: pq_event.c:485
struct GNUNET_DB_EventHandler * GNUNET_PQ_event_listen(struct GNUNET_PQ_Context *db, const struct GNUNET_DB_EventHeaderP *es, struct GNUNET_TIME_Relative timeout, GNUNET_DB_EventCallback cb, void *cb_cls)
Register callback to be invoked on events of type es.
Definition: pq_event.c:416
static void event_timeout(void *cls)
Function run on timeout for an event.
Definition: pq_event.c:404
static char * sh_to_channel(struct GNUNET_ShortHashCode *sh, char identifier[64])
Convert sh to a Postgres identifier.
Definition: pq_event.c:93
static char * es_to_channel(const struct GNUNET_DB_EventHeaderP *es, char identifier[64])
Convert es to a Postgres identifier.
Definition: pq_event.c:135
static void event_do_poll(struct GNUNET_PQ_Context *db)
Definition: pq_event.c:189
void GNUNET_PQ_event_reconnect_(struct GNUNET_PQ_Context *db, int fd)
Internal API.
Definition: pq_event.c:383
static void manage_subscribe(struct GNUNET_PQ_Context *db, const char *cmd, struct GNUNET_DB_EventHandler *eh)
Helper function to trigger an SQL cmd on db.
Definition: pq_event.c:324
static int do_notify(void *cls, const struct GNUNET_ShortHashCode *sh, void *value)
Function called on every event handler that needs to be triggered.
Definition: pq_event.c:174
static void scheduler_fd_cb(void *cls, int fd)
Function called when the Postgres FD changes and we need to update the scheduler event loop task.
Definition: pq_event.c:286
static void do_scheduler_notify(void *cls)
The GNUnet scheduler notifies us that we need to trigger the DB event poller.
Definition: pq_event.c:261
void GNUNET_PQ_event_listen_cancel(struct GNUNET_DB_EventHandler *eh)
Stop notifications.
Definition: pq_event.c:453
Handle for an active LISTENer to the database.
Definition: pq_event.c:34
void * cb_cls
Closure for cb.
Definition: pq_event.c:48
struct GNUNET_SCHEDULER_Task * timeout_task
Task to run on timeout.
Definition: pq_event.c:58
struct GNUNET_ShortHashCode sh
Channel name.
Definition: pq_event.c:38
struct GNUNET_PQ_Context * db
Database context this event handler is with.
Definition: pq_event.c:53
GNUNET_DB_EventCallback cb
Function to call on events.
Definition: pq_event.c:43
Header of a structure that describes an event channel we may subscribe to or notify on.
Definition: gnunet_db_lib.h:91
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
Definition: gnunet_db_lib.h:96
A 512-bit hashcode.
Handle to Postgres database.
Definition: pq.h:36
struct GNUNET_PQ_ExecuteStatement * es
Statements to execute upon connection.
Definition: pq.h:45
Entry in list of pending tasks.
Definition: scheduler.c:135
A 256-bit hashcode.
Time for relative time used by GNUnet, in microseconds.
Closure for do_notify().
Definition: pq_event.c:151
void * extra
Extra argument of the notification, or NULL.
Definition: pq_event.c:155
size_t extra_size
Number of bytes in extra.
Definition: pq_event.c:160