GNUnet 0.24.1-15-gab6ed22f1
pq_event.c
Go to the documentation of this file.
1/*
2 This file is part of GNUnet
3 Copyright (C) 2021, 2023 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
25#include "platform.h"
26#include "pq.h"
27#include <pthread.h>
28
29
34{
39
44
48 void *cb_cls;
49
54
59};
60
61
68static void
71{
72 struct GNUNET_HashCode h_channel;
73
75 ntohs (es->size),
76 &h_channel);
77 GNUNET_static_assert (sizeof (*sh) <= sizeof (h_channel));
78 memcpy (sh,
79 &h_channel,
80 sizeof (*sh));
81}
82
83
92static char *
94 char identifier[64])
95{
96 char *end;
97
99 sizeof (*sh),
100 identifier,
101 63);
102 GNUNET_assert (NULL != end);
103 *end = '\0';
104 return end;
105}
106
107
116channel_to_sh (const char *identifier,
117 struct GNUNET_ShortHashCode *sh)
118{
119 return GNUNET_STRINGS_string_to_data (identifier,
120 strlen (identifier),
121 sh,
122 sizeof (*sh));
123}
124
125
134static char *
136 char identifier[64])
137{
139
140 es_to_sh (es,
141 &sh);
142 return sh_to_channel (&sh,
143 identifier);
144}
145
146
151{
155 void *extra;
156
161};
162
163
174do_notify (void *cls,
175 const struct GNUNET_ShortHashCode *sh,
176 void *value)
177{
178 struct NotifyContext *ctx = cls;
179 struct GNUNET_DB_EventHandler *eh = value;
180
181 eh->cb (eh->cb_cls,
182 ctx->extra,
183 ctx->extra_size);
184 return GNUNET_OK;
185}
186
187
188void
190{
191 PGnotify *n;
192 unsigned int cnt = 0;
193
195 "PG poll job active\n");
196 if (1 !=
197 PQconsumeInput (db->conn))
198 {
200 "Failed to read from Postgres: %s\n",
201 PQerrorMessage (db->conn));
202 if (CONNECTION_BAD != PQstatus (db->conn))
203 return;
205 return;
206 }
207 while (NULL != (n = PQnotifies (db->conn)))
208 {
210 struct NotifyContext ctx = {
211 .extra = NULL
212 };
213
214 cnt++;
215 if ('X' != toupper ((int) n->relname[0]))
216 {
218 "Ignoring notification for unsupported channel identifier `%s'\n",
219 n->relname);
220 PQfreemem (n);
221 continue;
222 }
223 if (GNUNET_OK !=
224 channel_to_sh (&n->relname[1],
225 &sh))
226 {
228 "Ignoring notification for unsupported channel identifier `%s'\n",
229 n->relname);
230 PQfreemem (n);
231 continue;
232 }
233 if ( (NULL != n->extra) &&
234 (GNUNET_OK !=
236 strlen (n->extra),
237 &ctx.extra,
238 &ctx.extra_size)))
239 {
241 "Ignoring notification for unsupported extra data `%s' on channel `%s'\n",
242 n->extra,
243 n->relname);
244 PQfreemem (n);
245 continue;
246 }
248 "Received notification %s with extra data `%.*s'\n",
249 n->relname,
250 (int) ctx.extra_size,
251 (const char *) ctx.extra);
253 &sh,
254 &do_notify,
255 &ctx);
256 GNUNET_free (ctx.extra);
257 PQfreemem (n);
258 }
260 "PG poll job finishes after %u events\n",
261 cnt);
262}
263
264
271static void
273{
274 struct GNUNET_PQ_Context *db = cls;
275
276 db->event_task = NULL;
277 if (NULL == db->rfd)
280 if (NULL != db->event_task)
281 {
282 /* GNUNET_PQ_reconnect() above could have actually
283 created another event_task, stop it */
284 GNUNET_SCHEDULER_cancel (db->event_task);
285 db->event_task = NULL;
286 }
288 "Resubscribing\n");
289 if (NULL == db->rfd)
290 {
291 db->resubscribe_backoff
292 = GNUNET_TIME_relative_max (db->resubscribe_backoff,
294 db->resubscribe_backoff
295 = GNUNET_TIME_STD_BACKOFF (db->resubscribe_backoff);
298 db);
299 return;
300 }
301 db->resubscribe_backoff = GNUNET_TIME_UNIT_SECONDS;
302 db->event_task
304 db->rfd,
306 db);
307}
308
309
317static void
318do_poll (void *cls)
319{
320 struct GNUNET_PQ_Context *db = cls;
321
322 db->poller_task = NULL;
324}
325
326
334static void
336 int fd)
337{
338 struct GNUNET_PQ_Context *db = cls;
339
341 "New poll FD is %d\n",
342 fd);
343 if (NULL != db->event_task)
344 {
345 GNUNET_SCHEDULER_cancel (db->event_task);
346 db->event_task = NULL;
347 }
348 GNUNET_free (db->rfd);
349 if (-1 == fd)
350 return;
351 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
352 return;
354 "Activating poll job on %d\n",
355 fd);
357 db->event_task
359 db->rfd,
361 db);
362}
363
364
372static void
374 const char *cmd,
375 struct GNUNET_DB_EventHandler *eh)
376{
377 char sql[16 + 64];
378 char *end;
379 PGresult *result;
380
381 if (NULL == db->conn)
382 return;
383 end = stpcpy (sql,
384 cmd);
385 end = sh_to_channel (&eh->sh,
386 end);
388 "Executing PQ command `%s'\n",
389 sql);
390 result = PQexec (db->conn,
391 sql);
392 if (PGRES_COMMAND_OK != PQresultStatus (result))
393 {
395 "pq",
396 "Failed to execute `%s': %s/%s/%s/%s/%s",
397 sql,
398 PQresultErrorField (result,
399 PG_DIAG_MESSAGE_PRIMARY),
400 PQresultErrorField (result,
401 PG_DIAG_MESSAGE_DETAIL),
402 PQresultErrorMessage (result),
403 PQresStatus (PQresultStatus (result)),
404 PQerrorMessage (db->conn));
405 }
406 PQclear (result);
407}
408
409
420 const struct GNUNET_ShortHashCode *sh,
421 void *value)
422{
423 struct GNUNET_PQ_Context *db = cls;
424 struct GNUNET_DB_EventHandler *eh = value;
425
427 "LISTEN X",
428 eh);
429 return GNUNET_OK;
430}
431
432
433void
435 int fd)
436{
438 "Change in PQ event FD to %d\n",
439 fd);
441 fd);
444 db);
445}
446
447
454static void
455event_timeout (void *cls)
456{
457 struct GNUNET_DB_EventHandler *eh = cls;
458
459 eh->timeout_task = NULL;
460 eh->cb (eh->cb_cls,
461 NULL,
462 0);
463}
464
465
468 const struct GNUNET_DB_EventHeaderP *es,
471 void *cb_cls)
472{
473 struct GNUNET_DB_EventHandler *eh;
474 bool sub;
475
477 eh->db = db;
478 es_to_sh (es,
479 &eh->sh);
480 eh->cb = cb;
481 eh->cb_cls = cb_cls;
482 sub = (NULL ==
484 &eh->sh));
487 &eh->sh,
488 eh,
490 if (NULL == db->event_task)
491 {
493 "Starting event scheduler\n");
495 PQsocket (db->conn));
496 }
497 if (sub)
499 "LISTEN X",
500 eh);
503 eh);
504 return eh;
505}
506
507
508void
510{
511 struct GNUNET_PQ_Context *db = eh->db;
512
515 &eh->sh,
516 eh));
517 if (NULL ==
519 &eh->sh))
521 "UNLISTEN X",
522 eh);
523 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
524 {
526 "Stopping PQ event scheduler job\n");
527 GNUNET_free (db->rfd);
528 if (NULL != db->event_task)
529 {
530 GNUNET_SCHEDULER_cancel (db->event_task);
531 db->event_task = NULL;
532 }
533 }
534 if (NULL != eh->timeout_task)
535 {
537 eh->timeout_task = NULL;
538 }
539 GNUNET_free (eh);
540}
541
542
543char *
545{
546 char sql[16 + 64 + 8];
547 char *end;
548
549 end = stpcpy (sql,
550 "X");
552 end);
553 GNUNET_assert (NULL != end);
554 return GNUNET_strdup (sql);
555}
556
557
558void
560 const struct GNUNET_DB_EventHeaderP *es,
561 const void *extra,
562 size_t extra_size)
563{
564 char sql[16 + 64 + extra_size * 8 / 5 + 8];
565 char *end;
566 PGresult *result;
567
568 end = stpcpy (sql,
569 "NOTIFY X");
571 end);
572 end = stpcpy (end,
573 ", '");
575 extra_size,
576 end,
577 sizeof (sql) - (end - sql) - 1);
578 GNUNET_assert (NULL != end);
579 *end = '\0';
580 end = stpcpy (end,
581 "'");
583 "Executing command `%s'\n",
584 sql);
585 result = PQexec (db->conn,
586 sql);
587 if (PGRES_COMMAND_OK != PQresultStatus (result))
588 {
590 "pq",
591 "Failed to execute `%s': %s/%s/%s/%s/%s",
592 sql,
593 PQresultErrorField (result,
594 PG_DIAG_MESSAGE_PRIMARY),
595 PQresultErrorField (result,
596 PG_DIAG_MESSAGE_DETAIL),
597 PQresultErrorMessage (result),
598 PQresStatus (PQresultStatus (result)),
599 PQerrorMessage (db->conn));
600 }
601 PQclear (result);
602 /* Make sure we do not miss this notification in case it was
603 for *us*, we need to trigger polling here.
604 Just waiting for the db socket to be readable won't work,
605 as postgres only queues notifications we triggered for
606 ourselves in an internal data structure. */
607 if (NULL == db->poller_task)
608 {
609 db->poller_task
611 db);
612 }
613}
614
615
616/* end of pq_event.c */
static struct GNUNET_TIME_Relative timeout
User defined timestamp for completing operations.
Definition: gnunet-arm.c:118
static int end
Set if we are to shutdown all services (including ARM).
Definition: gnunet-arm.c:33
static struct GNUNET_FS_Handle * ctx
static struct GNUNET_IDENTITY_Handle * sh
Handle to IDENTITY service.
static char * value
Value of the record to add/remove.
static int result
Global testing status.
static struct GNUNET_FS_DirectoryBuilder * db
void(* GNUNET_DB_EventCallback)(void *cls, const void *extra, size_t extra_size)
Function called on events received from Postgres.
Definition: gnunet_db_lib.h:80
void GNUNET_PQ_reconnect(struct GNUNET_PQ_Context *db)
Reinitialize the database db.
Definition: pq_connect.c:562
void GNUNET_CRYPTO_hash(const void *block, size_t size, struct GNUNET_HashCode *ret)
Compute hash of a given block.
Definition: crypto_hash.c:41
enum GNUNET_GenericReturnValue GNUNET_CONTAINER_multishortmap_put(struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key, void *value, enum GNUNET_CONTAINER_MultiHashMapOption opt)
Store a key-value pair in the map.
void * GNUNET_CONTAINER_multishortmap_get(const struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key)
Given a key find a value in the map matching the key.
int GNUNET_CONTAINER_multishortmap_iterate(struct GNUNET_CONTAINER_MultiShortmap *map, GNUNET_CONTAINER_ShortmapIterator it, void *it_cls)
Iterate over all entries in the map.
int GNUNET_CONTAINER_multishortmap_get_multiple(struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key, GNUNET_CONTAINER_ShortmapIterator it, void *it_cls)
Iterate over all entries in the map that match a particular key.
unsigned int GNUNET_CONTAINER_multishortmap_size(const struct GNUNET_CONTAINER_MultiShortmap *map)
Get the number of key-value pairs in the map.
int GNUNET_CONTAINER_multishortmap_remove(struct GNUNET_CONTAINER_MultiShortmap *map, const struct GNUNET_ShortHashCode *key, const void *value)
Remove the given key-value pair from the map.
@ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE
Allow multiple values with the same key.
#define GNUNET_log(kind,...)
#define GNUNET_log_from(kind, comp,...)
GNUNET_GenericReturnValue
Named constants for return values.
#define GNUNET_static_assert(cond)
Assertion to be checked (if supported by C compiler) at compile time, otherwise checked at runtime an...
@ GNUNET_OK
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
@ GNUNET_ERROR_TYPE_WARNING
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_INFO
#define GNUNET_strdup(a)
Wrapper around GNUNET_xstrdup_.
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_free(ptr)
Wrapper around free.
struct GNUNET_NETWORK_Handle * GNUNET_NETWORK_socket_box_native(int fd)
Box a native socket (and check that it is a socket).
Definition: network.c:580
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_read_net(struct GNUNET_TIME_Relative delay, struct GNUNET_NETWORK_Handle *rfd, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay or when the specified file descriptor is ready f...
Definition: scheduler.c:1511
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:980
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_now(GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run as soon as possible.
Definition: scheduler.c:1304
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1277
enum GNUNET_GenericReturnValue GNUNET_STRINGS_string_to_data_alloc(const char *enc, size_t enclen, void **out, size_t *out_size)
Convert CrockfordBase32 encoding back to data.
Definition: strings.c:882
char * GNUNET_STRINGS_data_to_string(const void *data, size_t size, char *out, size_t out_size)
Convert binary data to ASCII encoding using CrockfordBase32.
Definition: strings.c:736
enum GNUNET_GenericReturnValue GNUNET_STRINGS_string_to_data(const char *enc, size_t enclen, void *out, size_t out_size)
Convert CrockfordBase32 encoding back to data.
Definition: strings.c:816
#define GNUNET_TIME_UNIT_FOREVER_REL
Constant used to specify "forever".
struct GNUNET_TIME_Relative GNUNET_TIME_relative_max(struct GNUNET_TIME_Relative t1, struct GNUNET_TIME_Relative t2)
Return the maximum of two relative time values.
Definition: time.c:352
#define GNUNET_TIME_UNIT_SECONDS
One second.
#define GNUNET_TIME_UNIT_ZERO
Relative time zero.
#define GNUNET_TIME_STD_BACKOFF(r)
Perform our standard exponential back-off calculation, starting at 1 ms and then going by a factor of...
shared internal data structures of libgnunetpq
static void do_poll(void *cls)
The GNUnet scheduler notifies us that we need to trigger the DB event poller directly after having po...
Definition: pq_event.c:318
void GNUNET_PQ_event_do_poll(struct GNUNET_PQ_Context *db)
Poll for events right now.
Definition: pq_event.c:189
static enum GNUNET_GenericReturnValue channel_to_sh(const char *identifier, struct GNUNET_ShortHashCode *sh)
Convert sh to a Postgres identifier.
Definition: pq_event.c:116
struct GNUNET_DB_EventHandler * GNUNET_PQ_event_listen(struct GNUNET_PQ_Context *db, const struct GNUNET_DB_EventHeaderP *es, struct GNUNET_TIME_Relative timeout, GNUNET_DB_EventCallback cb, void *cb_cls)
Register callback to be invoked on events of type es.
Definition: pq_event.c:467
static void es_to_sh(const struct GNUNET_DB_EventHeaderP *es, struct GNUNET_ShortHashCode *sh)
Convert es to a short hash.
Definition: pq_event.c:69
void GNUNET_PQ_event_notify(struct GNUNET_PQ_Context *db, const struct GNUNET_DB_EventHeaderP *es, const void *extra, size_t extra_size)
Notify all that listen on es of an event.
Definition: pq_event.c:559
char * GNUNET_PQ_get_event_notify_channel(const struct GNUNET_DB_EventHeaderP *es)
Compute the channel that one should notify upon for the given event specification.
Definition: pq_event.c:544
static void event_timeout(void *cls)
Function run on timeout for an event.
Definition: pq_event.c:455
static enum GNUNET_GenericReturnValue register_notify(void *cls, const struct GNUNET_ShortHashCode *sh, void *value)
Re-subscribe to notifications after disconnect.
Definition: pq_event.c:419
static char * sh_to_channel(struct GNUNET_ShortHashCode *sh, char identifier[64])
Convert sh to a Postgres identifier.
Definition: pq_event.c:93
void GNUNET_PQ_event_reconnect_(struct GNUNET_PQ_Context *db, int fd)
Internal API.
Definition: pq_event.c:434
static void manage_subscribe(struct GNUNET_PQ_Context *db, const char *cmd, struct GNUNET_DB_EventHandler *eh)
Helper function to trigger an SQL cmd on db.
Definition: pq_event.c:373
static enum GNUNET_GenericReturnValue do_notify(void *cls, const struct GNUNET_ShortHashCode *sh, void *value)
Function called on every event handler that needs to be triggered.
Definition: pq_event.c:174
static void scheduler_fd_cb(void *cls, int fd)
Function called when the Postgres FD changes and we need to update the scheduler event loop task.
Definition: pq_event.c:335
static void do_scheduler_notify(void *cls)
The GNUnet scheduler notifies us that we need to trigger the DB event poller.
Definition: pq_event.c:272
void GNUNET_PQ_event_listen_cancel(struct GNUNET_DB_EventHandler *eh)
Stop notifications.
Definition: pq_event.c:509
static char * es_to_channel(const struct GNUNET_DB_EventHeaderP *es, char identifier[64])
Convert es to a Postgres identifier.
Definition: pq_event.c:135
Handle for an active LISTENer to the database.
Definition: pq_event.c:34
void * cb_cls
Closure for cb.
Definition: pq_event.c:48
struct GNUNET_SCHEDULER_Task * timeout_task
Task to run on timeout.
Definition: pq_event.c:58
struct GNUNET_ShortHashCode sh
Channel name.
Definition: pq_event.c:38
struct GNUNET_PQ_Context * db
Database context this event handler is with.
Definition: pq_event.c:53
GNUNET_DB_EventCallback cb
Function to call on events.
Definition: pq_event.c:43
Header of a structure that describes an event channel we may subscribe to or notify on.
Definition: gnunet_db_lib.h:92
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
Definition: gnunet_db_lib.h:97
A 512-bit hashcode.
Handle to Postgres database.
Definition: pq.h:36
struct GNUNET_PQ_ExecuteStatement * es
Statements to execute upon connection.
Definition: pq.h:45
Entry in list of pending tasks.
Definition: scheduler.c:136
A 256-bit hashcode.
Time for relative time used by GNUnet, in microseconds.
Closure for do_notify().
Definition: pq_event.c:151
void * extra
Extra argument of the notification, or NULL.
Definition: pq_event.c:155
size_t extra_size
Number of bytes in extra.
Definition: pq_event.c:160