GNUnet  0.17.5
pq_event.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2021 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
25 #include "platform.h"
26 #include "pq.h"
27 #include <pthread.h>
28 
29 
34 {
38  struct GNUNET_ShortHashCode sh;
39 
44 
48  void *cb_cls;
49 
54 
59 };
60 
61 
68 static void
69 es_to_sh (const struct GNUNET_DB_EventHeaderP *es,
70  struct GNUNET_ShortHashCode *sh)
71 {
72  struct GNUNET_HashCode h_channel;
73 
75  ntohs (es->size),
76  &h_channel);
77  GNUNET_static_assert (sizeof (*sh) <= sizeof (h_channel));
78  memcpy (sh,
79  &h_channel,
80  sizeof (*sh));
81 }
82 
83 
92 static char *
94  char identifier[64])
95 {
96  char *end;
97 
99  sizeof (*sh),
100  identifier,
101  63);
102  GNUNET_assert (NULL != end);
103  *end = '\0';
104  return end;
105 }
106 
107 
115 static enum GNUNET_GenericReturnValue
116 channel_to_sh (const char *identifier,
117  struct GNUNET_ShortHashCode *sh)
118 {
119  return GNUNET_STRINGS_string_to_data (identifier,
120  strlen (identifier),
121  sh,
122  sizeof (*sh));
123 }
124 
125 
134 static char *
136  char identifier[64])
137 {
138  struct GNUNET_ShortHashCode sh;
139 
140  es_to_sh (es,
141  &sh);
142  return sh_to_channel (&sh,
143  identifier);
144 }
145 
146 
151 {
155  void *extra;
156 
160  size_t extra_size;
161 };
162 
163 
173 static int
174 do_notify (void *cls,
175  const struct GNUNET_ShortHashCode *sh,
176  void *value)
177 {
178  struct NotifyContext *ctx = cls;
179  struct GNUNET_DB_EventHandler *eh = value;
180 
181  eh->cb (eh->cb_cls,
182  ctx->extra,
183  ctx->extra_size);
184  return GNUNET_OK;
185 }
186 
187 
188 static void
190 {
191  PGnotify *n;
192  unsigned int cnt = 0;
193 
195  "PG poll job active\n");
196  if (1 !=
197  PQconsumeInput (db->conn))
198  {
200  "Failed to read from Postgres: %s\n",
201  PQerrorMessage (db->conn));
202  if (CONNECTION_BAD != PQstatus (db->conn))
203  return;
205  return;
206  }
207  while (NULL != (n = PQnotifies (db->conn)))
208  {
209  struct GNUNET_ShortHashCode sh;
210  struct NotifyContext ctx = {
211  .extra = NULL
212  };
213 
214  cnt++;
215  if ('X' != toupper ((int) n->relname[0]))
216  {
218  "Ignoring notification for unsupported channel identifier `%s'\n",
219  n->relname);
220  PQfreemem (n);
221  continue;
222  }
223  if (GNUNET_OK !=
224  channel_to_sh (&n->relname[1],
225  &sh))
226  {
228  "Ignoring notification for unsupported channel identifier `%s'\n",
229  n->relname);
230  PQfreemem (n);
231  continue;
232  }
233  if ( (NULL != n->extra) &&
234  (GNUNET_OK !=
236  strlen (n->extra),
237  &ctx.extra,
238  &ctx.extra_size)))
239  {
241  "Ignoring notification for unsupported extra data `%s' on channel `%s'\n",
242  n->extra,
243  n->relname);
244  PQfreemem (n);
245  continue;
246  }
248  "Received notification %s with extra data `%.*s'\n",
249  n->relname,
250  (int) ctx.extra_size,
251  (const char *) ctx.extra);
253  &sh,
254  &do_notify,
255  &ctx);
256  GNUNET_free (ctx.extra);
257  PQfreemem (n);
258  }
260  "PG poll job finishes after %u events\n",
261  cnt);
262 }
263 
264 
271 static void
273 {
274  struct GNUNET_PQ_Context *db = cls;
275 
276  db->event_task = NULL;
277  if (NULL == db->rfd)
279  event_do_poll (db);
280  if (NULL != db->event_task)
281  return;
283  "Resubscribing\n");
284  if (NULL == db->rfd)
285  {
288  db);
289  return;
290  }
291  db->event_task
293  db->rfd,
295  db);
296 }
297 
298 
306 static void
307 scheduler_fd_cb (void *cls,
308  int fd)
309 {
310  struct GNUNET_PQ_Context *db = cls;
311 
313  "New poll FD is %d\n",
314  fd);
315  if (NULL != db->event_task)
316  {
317  GNUNET_SCHEDULER_cancel (db->event_task);
318  db->event_task = NULL;
319  }
320  GNUNET_free (db->rfd);
321  if (-1 == fd)
322  return;
323  if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
324  return;
326  "Activating poll job on %d\n",
327  fd);
329  db->event_task
331  db->rfd,
333  db);
334 }
335 
336 
344 static void
346  const char *cmd,
347  struct GNUNET_DB_EventHandler *eh)
348 {
349  char sql[16 + 64];
350  char *end;
351  PGresult *result;
352 
353  if (NULL == db->conn)
354  return;
355  end = stpcpy (sql,
356  cmd);
357  end = sh_to_channel (&eh->sh,
358  end);
360  "Executing PQ command `%s'\n",
361  sql);
362  result = PQexec (db->conn,
363  sql);
364  if (PGRES_COMMAND_OK != PQresultStatus (result))
365  {
367  "pq",
368  "Failed to execute `%s': %s/%s/%s/%s/%s",
369  sql,
370  PQresultErrorField (result,
371  PG_DIAG_MESSAGE_PRIMARY),
372  PQresultErrorField (result,
373  PG_DIAG_MESSAGE_DETAIL),
374  PQresultErrorMessage (result),
375  PQresStatus (PQresultStatus (result)),
376  PQerrorMessage (db->conn));
377  }
378  PQclear (result);
379 }
380 
381 
390 static int
391 register_notify (void *cls,
392  const struct GNUNET_ShortHashCode *sh,
393  void *value)
394 {
395  struct GNUNET_PQ_Context *db = cls;
396  struct GNUNET_DB_EventHandler *eh = value;
397 
399  "LISTEN X",
400  eh);
401  return GNUNET_OK;
402 }
403 
404 
405 void
407  int fd)
408 {
410  "Change in PQ event FD to %d\n",
411  fd);
413  fd);
416  db);
417 }
418 
419 
426 static void
427 event_timeout (void *cls)
428 {
429  struct GNUNET_DB_EventHandler *eh = cls;
430 
431  eh->timeout_task = NULL;
432  eh->cb (eh->cb_cls,
433  NULL,
434  0);
435 }
436 
437 
438 struct GNUNET_DB_EventHandler *
440  const struct GNUNET_DB_EventHeaderP *es,
443  void *cb_cls)
444 {
445  struct GNUNET_DB_EventHandler *eh;
446  bool sub;
447 
448  eh = GNUNET_new (struct GNUNET_DB_EventHandler);
449  eh->db = db;
450  es_to_sh (es,
451  &eh->sh);
452  eh->cb = cb;
453  eh->cb_cls = cb_cls;
454  sub = (NULL ==
456  &eh->sh));
459  &eh->sh,
460  eh,
462  if (NULL == db->event_task)
463  {
465  "Starting event scheduler\n");
467  PQsocket (db->conn));
468  }
469  if (sub)
471  "LISTEN X",
472  eh);
474  &event_timeout,
475  eh);
476  return eh;
477 }
478 
479 
480 void
482 {
483  struct GNUNET_PQ_Context *db = eh->db;
484 
487  &eh->sh,
488  eh));
489  if (NULL ==
491  &eh->sh))
493  "UNLISTEN X",
494  eh);
495  if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
496  {
498  "Stopping PQ event scheduler job\n");
499  GNUNET_free (db->rfd);
500  if (NULL != db->event_task)
501  {
502  GNUNET_SCHEDULER_cancel (db->event_task);
503  db->event_task = NULL;
504  }
505  }
506  if (NULL != eh->timeout_task)
507  {
509  eh->timeout_task = NULL;
510  }
511  GNUNET_free (eh);
512 }
513 
514 
515 void
517  const struct GNUNET_DB_EventHeaderP *es,
518  const void *extra,
519  size_t extra_size)
520 {
521  char sql[16 + 64 + extra_size * 8 / 5 + 8];
522  char *end;
523  PGresult *result;
524 
525  end = stpcpy (sql,
526  "NOTIFY X");
527  end = es_to_channel (es,
528  end);
529  end = stpcpy (end,
530  ", '");
532  extra_size,
533  end,
534  sizeof (sql) - (end - sql) - 1);
535  GNUNET_assert (NULL != end);
536  *end = '\0';
537  end = stpcpy (end,
538  "'");
540  "Executing command `%s'\n",
541  sql);
542  result = PQexec (db->conn,
543  sql);
544  if (PGRES_COMMAND_OK != PQresultStatus (result))
545  {
547  "pq",
548  "Failed to execute `%s': %s/%s/%s/%s/%s",
549  sql,
550  PQresultErrorField (result,
551  PG_DIAG_MESSAGE_PRIMARY),
552  PQresultErrorField (result,
553  PG_DIAG_MESSAGE_DETAIL),
554  PQresultErrorMessage (result),
555  PQresStatus (PQresultStatus (result)),
556  PQerrorMessage (db->conn));
557  }
558  PQclear (result);
559  event_do_poll (db);
560 }
561 
562 
563 /* end of pq_event.c */
static struct GNUNET_TIME_Relative timeout
Desired timeout for the lookup (default is no timeout).
Definition: gnunet-abd.c:61
static int end
Set if we are to shutdown all services (including ARM).
Definition: gnunet-arm.c:34
static struct SolverHandle * sh
static char * value
Value of the record to add/remove.
static int result
Global testing status.
static struct GNUNET_FS_DirectoryBuilder * db
Definition: gnunet-search.c:94
static struct GNUNET_DNSSTUB_Context * ctx
Context for DNS resolution.
void(* GNUNET_DB_EventCallback)(void *cls, const void *extra, size_t extra_size)
Function called on events received from Postgres.
Definition: gnunet_db_lib.h:79
void GNUNET_PQ_reconnect(struct GNUNET_PQ_Context *db)
Reinitialize the database db.
Definition: pq_connect.c:325
void GNUNET_CRYPTO_hash(const void *block, size_t size, struct GNUNET_HashCode *ret)
Compute hash of a given block.
Definition: crypto_hash.c:41
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multishortmap_put(struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
int GNUNET_CONTAINER_multishortmap_iterate(struct GNUNET_CONTAINER_MultiShortmap *map, GNUNET_CONTAINER_ShortmapIterator it, void *it_cls)
Iterate over all entries in the map.
int GNUNET_CONTAINER_multishortmap_get_multiple(struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key, GNUNET_CONTAINER_ShortmapIterator it, void *it_cls)
Iterate over all entries in the map that match a particular key.
void * GNUNET_CONTAINER_multishortmap_get(const struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key)
Given a key find a value in the map matching the key.
unsigned int GNUNET_CONTAINER_multishortmap_size(const struct GNUNET_CONTAINER_MultiShortmap *map)
Get the number of key-value pairs in the map.
int GNUNET_CONTAINER_multishortmap_remove(struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key, const void *value)
Remove the given key-value pair from the map.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE
Allow multiple values with the same key.
#define GNUNET_log(kind,...)
#define GNUNET_log_from(kind, comp,...)
GNUNET_GenericReturnValue
Named constants for return values.
Definition: gnunet_common.h:96
#define GNUNET_static_assert(cond)
Assertion to be checked (if supported by C compiler) at compile time, otherwise checked at runtime an...
@ GNUNET_OK
Definition: gnunet_common.h:99
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_INFO
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_free(ptr)
Wrapper around free.
struct GNUNET_NETWORK_Handle * GNUNET_NETWORK_socket_box_native(int fd)
Box a native socket (and check that it is a socket).
Definition: network.c:584
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_read_net(struct GNUNET_TIME_Relative delay, struct GNUNET_NETWORK_Handle *rfd, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay or when the specified file descriptor is ready f...
Definition: scheduler.c:1502
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:957
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1254
enum GNUNET_GenericReturnValue GNUNET_STRINGS_string_to_data_alloc(const char *enc, size_t enclen, void **out, size_t *out_size)
Convert CrockfordBase32 encoding back to data.
Definition: strings.c:855
char * GNUNET_STRINGS_data_to_string(const void *data, size_t size, char *out, size_t out_size)
Convert binary data to ASCII encoding using CrockfordBase32.
Definition: strings.c:709
enum GNUNET_GenericReturnValue GNUNET_STRINGS_string_to_data(const char *enc, size_t enclen, void *out, size_t out_size)
Convert CrockfordBase32 encoding back to data.
Definition: strings.c:789
#define GNUNET_TIME_UNIT_FOREVER_REL
Constant used to specify "forever".
#define GNUNET_TIME_UNIT_SECONDS
One second.
#define GNUNET_TIME_UNIT_ZERO
Relative time zero.
shared internal data structures of libgnunetpq
static int register_notify(void *cls, const struct GNUNET_ShortHashCode *sh, void *value)
Re-subscribe to notifications after disconnect.
Definition: pq_event.c:391
static enum GNUNET_GenericReturnValue channel_to_sh(const char *identifier, struct GNUNET_ShortHashCode *sh)
Convert sh to a Postgres identifier.
Definition: pq_event.c:116
static void es_to_sh(const struct GNUNET_DB_EventHeaderP *es, struct GNUNET_ShortHashCode *sh)
Convert es to a short hash.
Definition: pq_event.c:69
void GNUNET_PQ_event_notify(struct GNUNET_PQ_Context *db, const struct GNUNET_DB_EventHeaderP *es, const void *extra, size_t extra_size)
Notify all that listen on es of an event.
Definition: pq_event.c:516
struct GNUNET_DB_EventHandler * GNUNET_PQ_event_listen(struct GNUNET_PQ_Context *db, const struct GNUNET_DB_EventHeaderP *es, struct GNUNET_TIME_Relative timeout, GNUNET_DB_EventCallback cb, void *cb_cls)
Register callback to be invoked on events of type es.
Definition: pq_event.c:439
static void event_timeout(void *cls)
Function run on timeout for an event.
Definition: pq_event.c:427
static char * sh_to_channel(struct GNUNET_ShortHashCode *sh, char identifier[64])
Convert sh to a Postgres identifier.
Definition: pq_event.c:93
static char * es_to_channel(const struct GNUNET_DB_EventHeaderP *es, char identifier[64])
Convert es to a Postgres identifier.
Definition: pq_event.c:135
static void event_do_poll(struct GNUNET_PQ_Context *db)
Definition: pq_event.c:189
void GNUNET_PQ_event_reconnect_(struct GNUNET_PQ_Context *db, int fd)
Internal API.
Definition: pq_event.c:406
static void manage_subscribe(struct GNUNET_PQ_Context *db, const char *cmd, struct GNUNET_DB_EventHandler *eh)
Helper function to trigger an SQL cmd on db.
Definition: pq_event.c:345
static int do_notify(void *cls, const struct GNUNET_ShortHashCode *sh, void *value)
Function called on every event handler that needs to be triggered.
Definition: pq_event.c:174
static void scheduler_fd_cb(void *cls, int fd)
Function called when the Postgres FD changes and we need to update the scheduler event loop task.
Definition: pq_event.c:307
static void do_scheduler_notify(void *cls)
The GNUnet scheduler notifies us that we need to trigger the DB event poller.
Definition: pq_event.c:272
void GNUNET_PQ_event_listen_cancel(struct GNUNET_DB_EventHandler *eh)
Stop notifications.
Definition: pq_event.c:481
Handle for an active LISTENer to the database.
Definition: pq_event.c:34
void * cb_cls
Closure for cb.
Definition: pq_event.c:48
struct GNUNET_SCHEDULER_Task * timeout_task
Task to run on timeout.
Definition: pq_event.c:58
struct GNUNET_ShortHashCode sh
Channel name.
Definition: pq_event.c:38
struct GNUNET_PQ_Context * db
Database context this event handler is with.
Definition: pq_event.c:53
GNUNET_DB_EventCallback cb
Function to call on events.
Definition: pq_event.c:43
Header of a structure that describes an event channel we may subscribe to or notify on.
Definition: gnunet_db_lib.h:91
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
Definition: gnunet_db_lib.h:96
A 512-bit hashcode.
Handle to Postgres database.
Definition: pq.h:36
struct GNUNET_PQ_ExecuteStatement * es
Statements to execute upon connection.
Definition: pq.h:45
Entry in list of pending tasks.
Definition: scheduler.c:135
A 256-bit hashcode.
Time for relative time used by GNUnet, in microseconds.
Closure for do_notify().
Definition: pq_event.c:151
void * extra
Extra argument of the notification, or NULL.
Definition: pq_event.c:155
size_t extra_size
Number of bytes in extra.
Definition: pq_event.c:160