GNUnet 0.22.2
datastore_api.c
Go to the documentation of this file.
1/*
2 This file is part of GNUnet
3 Copyright (C) 2004-2013, 2016 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20
27#include "platform.h"
28#include "gnunet_arm_service.h"
29#include "gnunet_constants.h"
32#include "datastore.h"
33
34#define LOG(kind, ...) GNUNET_log_from (kind, "datastore-api", __VA_ARGS__)
35
36#define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37
41#define INSANE_STATISTICS GNUNET_NO
42
49#define MAX_EXCESS_RESULTS 8
50
55{
60
64 void *cont_cls;
65};
66
67
72{
77
81 void *proc_cls;
82};
83
84
89{
91
93};
94
95
100{
105
110
115
120
124 void *cont_cls;
125
130
136
142
146 unsigned int priority;
147
152 unsigned int max_queue;
153
158};
159
160
165{
170
175
180
185
190
195
201
205 unsigned int queue_size;
206
212 unsigned int result_count;
213
217 unsigned int skip_next_messages;
218};
219
220
226static void
227try_reconnect (void *cls);
228
229
236static void
238{
239 if (NULL == h->mq)
240 {
241 GNUNET_break (0);
242 return;
243 }
245 h->mq = NULL;
246 h->skip_next_messages = 0;
248 = GNUNET_SCHEDULER_add_delayed (h->retry_time,
250 h);
251}
252
253
261static void
263{
264 struct GNUNET_DATASTORE_Handle *h = qe->h;
265
266 GNUNET_CONTAINER_DLL_remove (h->queue_head,
267 h->queue_tail,
268 qe);
269 h->queue_size--;
270 if (NULL != qe->env)
272 if (NULL != qe->delay_warn_task)
274 GNUNET_free (qe);
275}
276
277
283static void
284delay_warning (void *cls)
285{
286 struct GNUNET_DATASTORE_QueueEntry *qe = cls;
287
288 qe->delay_warn_task = NULL;
290 "Request %p of type %u at head of datastore queue for more than %s\n",
291 qe,
292 (unsigned int) qe->response_type,
294 GNUNET_YES));
297 qe);
298}
299
300
307static void
309 enum GNUNET_MQ_Error error)
310{
311 struct GNUNET_DATASTORE_Handle *h = cls;
313
315 "MQ error, reconnecting to DATASTORE\n");
317 qe = h->queue_head;
318 if (NULL == qe)
319 return;
320 if (NULL != qe->delay_warn_task)
321 {
323 qe->delay_warn_task = NULL;
324 }
325 if (NULL == qe->env)
326 {
327 union QueueContext qc = qe->qc;
328 uint16_t rt = qe->response_type;
329
331 "Failed to receive response from database.\n");
333 switch (rt)
334 {
336 if (NULL != qc.sc.cont)
337 qc.sc.cont (qc.sc.cont_cls,
340 _ ("DATASTORE disconnected"));
341 break;
342
344 if (NULL != qc.rc.proc)
345 qc.rc.proc (qc.rc.proc_cls,
346 NULL,
347 0,
348 NULL,
349 0,
350 0,
351 0,
352 0,
354 0);
355 break;
356
357 default:
358 GNUNET_break (0);
359 }
360 }
361}
362
363
372{
374
376 "Establishing DATASTORE connection!\n");
378 h->cfg = cfg;
380 if (NULL == h->mq)
381 {
382 GNUNET_free (h);
383 return NULL;
384 }
385 h->stats = GNUNET_STATISTICS_create ("datastore-api",
386 cfg);
387 return h;
388}
389
390
397static void
399{
400 struct GNUNET_DATASTORE_Handle *h = cls;
401
403 "Drop sent, disconnecting\n");
405 GNUNET_NO);
406}
407
408
415static void
417 enum GNUNET_MQ_Error error)
418{
419 struct GNUNET_DATASTORE_Handle *h = cls;
420
422 "Failed to ask datastore to drop tables\n");
424 GNUNET_NO);
425}
426
427
435void
437 int drop)
438{
440
442 "Datastore disconnect\n");
443 if (NULL != h->mq)
444 {
446 h->mq = NULL;
447 }
448 if (NULL != h->reconnect_task)
449 {
451 h->reconnect_task = NULL;
452 }
453 while (NULL != (qe = h->queue_head))
454 {
455 switch (qe->response_type)
456 {
458 if (NULL != qe->qc.sc.cont)
459 qe->qc.sc.cont (qe->qc.sc.cont_cls,
462 _ ("Disconnected from DATASTORE"));
463 break;
464
466 if (NULL != qe->qc.rc.proc)
467 qe->qc.rc.proc (qe->qc.rc.proc_cls,
468 NULL,
469 0,
470 NULL,
471 0,
472 0,
473 0,
474 0,
476 0);
477 break;
478
479 default:
480 GNUNET_break (0);
481 }
483 }
484 if (GNUNET_YES == drop)
485 {
487 "Re-connecting to issue DROP!\n");
488 GNUNET_assert (NULL == h->mq);
490 "datastore",
491 NULL,
493 h);
494 if (NULL != h->mq)
495 {
496 struct GNUNET_MessageHeader *hdr;
497 struct GNUNET_MQ_Envelope *env;
498
499 env = GNUNET_MQ_msg (hdr,
503 h);
505 env);
506 return;
507 }
508 GNUNET_break (0);
509 }
511 GNUNET_NO);
512 h->stats = NULL;
513 GNUNET_free (h);
514}
515
516
532static struct GNUNET_DATASTORE_QueueEntry *
534 struct GNUNET_MQ_Envelope *env,
535 unsigned int queue_priority,
536 unsigned int max_queue_size,
537 uint16_t expected_type,
538 const union QueueContext *qc)
539{
541 struct GNUNET_DATASTORE_QueueEntry *pos;
542 unsigned int c;
543
544 if ((NULL != h->queue_tail) &&
545 (h->queue_tail->priority >= queue_priority))
546 {
547 c = h->queue_size;
548 pos = NULL;
549 }
550 else
551 {
552 c = 0;
553 pos = h->queue_head;
554 }
555 while ((NULL != pos) &&
556 (c < max_queue_size) &&
557 (pos->priority >= queue_priority))
558 {
559 c++;
560 pos = pos->next;
561 }
562 if (c >= max_queue_size)
563 {
565 gettext_noop ("# queue overflows"),
566 1,
567 GNUNET_NO);
569 return NULL;
570 }
572 qe->h = h;
573 qe->env = env;
574 qe->response_type = expected_type;
575 qe->qc = *qc;
576 qe->priority = queue_priority;
577 qe->max_queue = max_queue_size;
578 if (NULL == pos)
579 {
580 /* append at the tail */
581 pos = h->queue_tail;
582 }
583 else
584 {
585 pos = pos->prev;
586 /* do not insert at HEAD if HEAD query was already
587 * transmitted and we are still receiving replies! */
588 if ((NULL == pos) &&
589 (NULL == h->queue_head->env))
590 pos = h->queue_head;
591 }
592 c++;
593#if INSANE_STATISTICS
595 gettext_noop ("# queue entries created"),
596 1,
597 GNUNET_NO);
598#endif
600 h->queue_tail,
601 pos,
602 qe);
603 h->queue_size++;
604 return qe;
605}
606
607
614static void
616{
618
619 if (NULL == (qe = h->queue_head))
620 {
621 /* no entry in queue */
623 "Queue empty\n");
624 return;
625 }
626 if (NULL == qe->env)
627 {
628 /* waiting for replies */
630 "Head request already transmitted\n");
631 return;
632 }
633 if (NULL == h->mq)
634 {
635 /* waiting for reconnect */
637 "Not connected\n");
638 return;
639 }
643 qe);
645 qe->env);
646 qe->env = NULL;
647}
648
649
657static struct GNUNET_DATASTORE_QueueEntry *
659 uint16_t response_type)
660{
662
663 if (h->skip_next_messages > 0)
664 {
665 h->skip_next_messages--;
667 return NULL;
668 }
669 qe = h->queue_head;
670 if (NULL == qe)
671 {
672 GNUNET_break (0);
674 return NULL;
675 }
676 if (NULL != qe->env)
677 {
678 GNUNET_break (0);
680 return NULL;
681 }
683 {
684 GNUNET_break (0);
686 return NULL;
687 }
688 return qe;
689}
690
691
699static int
700check_status (void *cls,
701 const struct StatusMessage *sm)
702{
703 uint16_t msize = ntohs (sm->header.size) - sizeof(*sm);
704 int32_t status = ntohl (sm->status);
705
706 if (msize > 0)
707 {
708 const char *emsg = (const char *) &sm[1];
709
710 if ('\0' != emsg[msize - 1])
711 {
712 GNUNET_break (0);
713 return GNUNET_SYSERR;
714 }
715 }
716 else if (GNUNET_SYSERR == status)
717 {
718 GNUNET_break (0);
719 return GNUNET_SYSERR;
720 }
721 return GNUNET_OK;
722}
723
724
731static void
732handle_status (void *cls,
733 const struct StatusMessage *sm)
734{
735 struct GNUNET_DATASTORE_Handle *h = cls;
737 struct StatusContext rc;
738 const char *emsg;
739 int32_t status = ntohl (sm->status);
740
743 if (NULL == qe)
744 return;
745 rc = qe->qc.sc;
747 if (ntohs (sm->header.size) > sizeof(struct StatusMessage))
748 emsg = (const char *) &sm[1];
749 else
750 emsg = NULL;
752 "Received status %d/%s\n",
753 (int) status,
754 emsg);
756 gettext_noop ("# status messages received"),
757 1,
758 GNUNET_NO);
759 h->retry_time = GNUNET_TIME_UNIT_ZERO;
761 if (NULL != rc.cont)
762 rc.cont (rc.cont_cls,
763 status,
765 emsg);
766}
767
768
775static int
776check_data (void *cls,
777 const struct DataMessage *dm)
778{
779 uint16_t msize = ntohs (dm->header.size) - sizeof(*dm);
780
781 if (msize != ntohl (dm->size))
782 {
783 GNUNET_break (0);
784 return GNUNET_SYSERR;
785 }
786 return GNUNET_OK;
787}
788
789
796static void
797handle_data (void *cls,
798 const struct DataMessage *dm)
799{
800 struct GNUNET_DATASTORE_Handle *h = cls;
802 struct ResultContext rc;
803
806 if (NULL == qe)
807 return;
808#if INSANE_STATISTICS
810 gettext_noop ("# Results received"),
811 1,
812 GNUNET_NO);
813#endif
815 "Received result %llu with type %u and size %u with key %s\n",
816 (unsigned long long) GNUNET_ntohll (dm->uid),
817 ntohl (dm->type),
818 ntohl (dm->size),
819 GNUNET_h2s (&dm->key));
820 rc = qe->qc.rc;
822 h->retry_time = GNUNET_TIME_UNIT_ZERO;
824 if (NULL != rc.proc)
825 rc.proc (rc.proc_cls,
826 &dm->key,
827 ntohl (dm->size),
828 &dm[1],
829 ntohl (dm->type),
830 ntohl (dm->priority),
831 ntohl (dm->anonymity),
832 ntohl (dm->replication),
834 GNUNET_ntohll (dm->uid));
835}
836
837
845static void
847 const struct GNUNET_MessageHeader *msg)
848{
849 struct GNUNET_DATASTORE_Handle *h = cls;
851 struct ResultContext rc;
852
855 if (NULL == qe)
856 return;
857 rc = qe->qc.rc;
860 "Received end of result set, new queue size is %u\n",
861 h->queue_size);
862 h->retry_time = GNUNET_TIME_UNIT_ZERO;
863 h->result_count = 0;
865 /* signal end of iteration */
866 if (NULL != rc.proc)
867 rc.proc (rc.proc_cls,
868 NULL,
869 0,
870 NULL,
871 0,
872 0,
873 0,
874 0,
876 0);
877}
878
879
885static void
886try_reconnect (void *cls)
887{
888 struct GNUNET_DATASTORE_Handle *h = cls;
892 struct StatusMessage,
893 h),
896 struct DataMessage,
897 h),
898 GNUNET_MQ_hd_fixed_size (data_end,
901 h),
903 };
904
905 h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
906 h->reconnect_task = NULL;
907 GNUNET_assert (NULL == h->mq);
909 "datastore",
910 handlers,
912 h);
913 if (NULL == h->mq)
914 return;
917 "# datastore connections (re)created"),
918 1,
919 GNUNET_NO);
921 "Reconnected to DATASTORE\n");
923}
924
925
934static void
936 int32_t result,
938 const char *emsg)
939{
940 /* do nothing */
941}
942
943
946 uint32_t rid,
947 const struct GNUNET_HashCode *key,
948 size_t size,
949 const void *data,
951 uint32_t priority,
952 uint32_t anonymity,
953 uint32_t replication,
955 unsigned int queue_priority,
956 unsigned int max_queue_size,
958 void *cont_cls)
959{
961 struct GNUNET_MQ_Envelope *env;
962 struct DataMessage *dm;
963 union QueueContext qc;
964
965 if (size + sizeof(*dm) >= GNUNET_MAX_MESSAGE_SIZE)
966 {
967 GNUNET_break (0);
968 return NULL;
969 }
970
972 "Asked to put %lu bytes of data under key `%s' for %s\n",
973 (unsigned long) size,
974 GNUNET_h2s (key),
977 GNUNET_YES));
979 size,
981 dm->rid = htonl (rid);
982 dm->size = htonl ((uint32_t) size);
983 dm->type = htonl (type);
984 dm->priority = htonl (priority);
985 dm->anonymity = htonl (anonymity);
986 dm->replication = htonl (replication);
988 dm->key = *key;
989 GNUNET_memcpy (&dm[1],
990 data,
991 size);
992 qc.sc.cont = cont;
993 qc.sc.cont_cls = cont_cls;
995 env,
996 queue_priority,
997 max_queue_size,
999 &qc);
1000 if (NULL == qe)
1001 {
1003 "Could not create queue entry for PUT\n");
1004 return NULL;
1005 }
1007 gettext_noop ("# PUT requests executed"),
1008 1,
1009 GNUNET_NO);
1010 process_queue (h);
1011 return qe;
1012}
1013
1014
1032 uint64_t amount,
1033 uint32_t entries,
1035 void *cont_cls)
1036{
1038 struct GNUNET_MQ_Envelope *env;
1039 struct ReserveMessage *rm;
1040 union QueueContext qc;
1041
1042 if (NULL == cont)
1043 cont = &drop_status_cont;
1045 "Asked to reserve %llu bytes of data and %u entries\n",
1046 (unsigned long long) amount,
1047 (unsigned int) entries);
1048 env = GNUNET_MQ_msg (rm,
1050 rm->entries = htonl (entries);
1051 rm->amount = GNUNET_htonll (amount);
1052
1053 qc.sc.cont = cont;
1054 qc.sc.cont_cls = cont_cls;
1056 env,
1057 UINT_MAX,
1058 UINT_MAX,
1060 &qc);
1061 if (NULL == qe)
1062 {
1064 "Could not create queue entry to reserve\n");
1065 return NULL;
1066 }
1068 gettext_noop ("# RESERVE requests executed"),
1069 1,
1070 GNUNET_NO);
1071 process_queue (h);
1072 return qe;
1073}
1074
1075
1078 uint32_t rid,
1079 unsigned int queue_priority,
1080 unsigned int max_queue_size,
1082 void *cont_cls)
1083{
1085 struct GNUNET_MQ_Envelope *env;
1086 struct ReleaseReserveMessage *rrm;
1087 union QueueContext qc;
1088
1089 if (NULL == cont)
1090 cont = &drop_status_cont;
1092 "Asked to release reserve %d\n",
1093 rid);
1094 env = GNUNET_MQ_msg (rrm,
1096 rrm->rid = htonl (rid);
1097 qc.sc.cont = cont;
1098 qc.sc.cont_cls = cont_cls;
1100 env,
1101 queue_priority,
1102 max_queue_size,
1104 &qc);
1105 if (NULL == qe)
1106 {
1108 "Could not create queue entry to release reserve\n");
1109 return NULL;
1110 }
1113 ("# RELEASE RESERVE requests executed"), 1,
1114 GNUNET_NO);
1115 process_queue (h);
1116 return qe;
1117}
1118
1119
1122 const struct GNUNET_HashCode *key,
1123 size_t size,
1124 const void *data,
1125 unsigned int queue_priority,
1126 unsigned int max_queue_size,
1128 void *cont_cls)
1129{
1131 struct DataMessage *dm;
1132 struct GNUNET_MQ_Envelope *env;
1133 union QueueContext qc;
1134
1135 if (sizeof(*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1136 {
1137 GNUNET_break (0);
1138 return NULL;
1139 }
1140 if (NULL == cont)
1141 cont = &drop_status_cont;
1143 "Asked to remove %lu bytes under key `%s'\n",
1144 (unsigned long) size,
1145 GNUNET_h2s (key));
1147 size,
1149 dm->size = htonl (size);
1150 dm->key = *key;
1151 GNUNET_memcpy (&dm[1],
1152 data,
1153 size);
1154
1155 qc.sc.cont = cont;
1156 qc.sc.cont_cls = cont_cls;
1157
1159 env,
1160 queue_priority,
1161 max_queue_size,
1163 &qc);
1164 if (NULL == qe)
1165 {
1167 "Could not create queue entry for REMOVE\n");
1168 return NULL;
1169 }
1171 gettext_noop ("# REMOVE requests executed"),
1172 1,
1173 GNUNET_NO);
1174 process_queue (h);
1175 return qe;
1176}
1177
1178
1199 unsigned int queue_priority,
1200 unsigned int max_queue_size,
1202 void *proc_cls)
1203{
1205 struct GNUNET_MQ_Envelope *env;
1206 struct GNUNET_MessageHeader *m;
1207 union QueueContext qc;
1208
1209 GNUNET_assert (NULL != proc);
1211 "Asked to get replication entry\n");
1212 env = GNUNET_MQ_msg (m,
1214 qc.rc.proc = proc;
1215 qc.rc.proc_cls = proc_cls;
1217 env,
1218 queue_priority,
1219 max_queue_size,
1221 &qc);
1222 if (NULL == qe)
1223 {
1225 "Could not create queue entry for GET REPLICATION\n");
1226 return NULL;
1227 }
1230 ("# GET REPLICATION requests executed"), 1,
1231 GNUNET_NO);
1232 process_queue (h);
1233 return qe;
1234}
1235
1236
1239 uint64_t next_uid,
1240 unsigned int queue_priority,
1241 unsigned int max_queue_size,
1244 void *proc_cls)
1245{
1247 struct GNUNET_MQ_Envelope *env;
1248 struct GetZeroAnonymityMessage *m;
1249 union QueueContext qc;
1250
1251 GNUNET_assert (NULL != proc);
1254 "Asked to get a zero-anonymity entry of type %d\n",
1255 type);
1256 env = GNUNET_MQ_msg (m,
1258 m->type = htonl ((uint32_t) type);
1259 m->next_uid = GNUNET_htonll (next_uid);
1260 qc.rc.proc = proc;
1261 qc.rc.proc_cls = proc_cls;
1263 env,
1264 queue_priority,
1265 max_queue_size,
1267 &qc);
1268 if (NULL == qe)
1269 {
1271 "Could not create queue entry for zero-anonymity procation\n");
1272 return NULL;
1273 }
1276 ("# GET ZERO ANONYMITY requests executed"), 1,
1277 GNUNET_NO);
1278 process_queue (h);
1279 return qe;
1280}
1281
1282
1303 uint64_t next_uid,
1304 bool random,
1305 const struct GNUNET_HashCode *key,
1307 unsigned int queue_priority,
1308 unsigned int max_queue_size,
1310 void *proc_cls)
1311{
1313 struct GNUNET_MQ_Envelope *env;
1314 struct GetKeyMessage *gkm;
1315 struct GetMessage *gm;
1316 union QueueContext qc;
1317
1318 GNUNET_assert (NULL != proc);
1320 "Asked to look for data of type %u under key `%s'\n",
1321 (unsigned int) type,
1322 (NULL != key) ? GNUNET_h2s (key) : "NULL");
1323 if (NULL == key)
1324 {
1325 env = GNUNET_MQ_msg (gm,
1327 gm->type = htonl (type);
1328 gm->next_uid = GNUNET_htonll (next_uid);
1329 gm->random = random;
1330 }
1331 else
1332 {
1333 env = GNUNET_MQ_msg (gkm,
1335 gkm->type = htonl (type);
1336 gkm->next_uid = GNUNET_htonll (next_uid);
1337 gkm->random = random;
1338 gkm->key = *key;
1339 }
1340 qc.rc.proc = proc;
1341 qc.rc.proc_cls = proc_cls;
1343 env,
1344 queue_priority,
1345 max_queue_size,
1347 &qc);
1348 if (NULL == qe)
1349 {
1351 "Could not queue request for `%s'\n",
1352 (NULL != key) ? GNUNET_h2s (key): "NULL");
1353 return NULL;
1354 }
1355#if INSANE_STATISTICS
1357 gettext_noop ("# GET requests executed"),
1358 1,
1359 GNUNET_NO);
1360#endif
1361 process_queue (h);
1362 return qe;
1363}
1364
1365
1372void
1374{
1375 struct GNUNET_DATASTORE_Handle *h = qe->h;
1376
1378 "Pending DATASTORE request %p cancelled (%d, %d)\n",
1379 qe,
1380 NULL == qe->env,
1381 h->queue_head == qe);
1382 if (NULL == qe->env)
1383 {
1385 h->skip_next_messages++;
1386 return;
1387 }
1389 process_queue (h);
1390}
1391
1392
1393/* end of datastore_api.c */
struct GNUNET_MQ_MessageHandlers handlers[]
Definition: 003.c:1
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
structs for communication between datastore service and API
static int check_status(void *cls, const struct StatusMessage *sm)
Function called to check status message from the service.
static void process_queue(struct GNUNET_DATASTORE_Handle *h)
Process entries in the queue (or do nothing if we are already doing so).
static void disconnect_after_drop(void *cls)
Task used by to disconnect from the datastore after we send the GNUNET_MESSAGE_TYPE_DATASTORE_DROP me...
static struct GNUNET_DATASTORE_QueueEntry * make_queue_entry(struct GNUNET_DATASTORE_Handle *h, struct GNUNET_MQ_Envelope *env, unsigned int queue_priority, unsigned int max_queue_size, uint16_t expected_type, const union QueueContext *qc)
Create a new entry for our priority queue (and possibly discard other entries if the queue is getting...
static void disconnect_on_mq_error(void *cls, enum GNUNET_MQ_Error error)
Handle error in sending drop request to datastore.
static void try_reconnect(void *cls)
Try reconnecting to the datastore service.
#define DELAY_WARN_TIMEOUT
Definition: datastore_api.c:36
static void delay_warning(void *cls)
Task that logs an error after some time.
static void mq_error_handler(void *cls, enum GNUNET_MQ_Error error)
Handle error in sending drop request to datastore.
static void do_disconnect(struct GNUNET_DATASTORE_Handle *h)
Disconnect from the service and then try reconnecting to the datastore service after some delay.
static void drop_status_cont(void *cls, int32_t result, struct GNUNET_TIME_Absolute min_expiration, const char *emsg)
Dummy continuation used to do nothing (but be non-zero).
static void free_queue_entry(struct GNUNET_DATASTORE_QueueEntry *qe)
Free a queue entry.
static int check_data(void *cls, const struct DataMessage *dm)
Check data message we received from the service.
static void handle_status(void *cls, const struct StatusMessage *sm)
Function called to handle status message from the service.
static void handle_data_end(void *cls, const struct GNUNET_MessageHeader *msg)
Type of a function to call when we receive a GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the ...
#define LOG(kind,...)
Definition: datastore_api.c:34
static void handle_data(void *cls, const struct DataMessage *dm)
Handle data message we got from the service.
static struct GNUNET_DATASTORE_QueueEntry * get_queue_head(struct GNUNET_DATASTORE_Handle *h, uint16_t response_type)
Get the entry at the head of the message queue.
#define gettext_noop(String)
Definition: gettext.h:74
static struct GNUNET_ARM_MonitorHandle * m
Monitor connection with ARM.
Definition: gnunet-arm.c:103
static struct GNUNET_ARM_Handle * h
Connection with ARM.
Definition: gnunet-arm.c:98
static struct GNUNET_CONFIGURATION_Handle * cfg
Our configuration.
Definition: gnunet-arm.c:108
static struct GNUNET_DATASTORE_QueueEntry * qe
Current operation.
static unsigned int replication
Desired replication level.
static char * data
The data to insert into the dht.
struct GNUNET_HashCode key
The key used in the DHT.
static struct GNUNET_TIME_Relative expiration
User supplied expiration value.
static unsigned int anonymity
static uint32_t type
Type string converted to DNS type value.
static int status
The program status; 0 for success.
Definition: gnunet-nse.c:39
static int result
Global testing status.
static struct GNUNET_TIME_Absolute min_expiration
Minimum time that content should have to not be discarded instantly (time stamp of any content that w...
API to create, modify and access statistics.
#define GNUNET_MAX_MESSAGE_SIZE
Largest supported message (to be precise, one byte more than the largest possible message,...
struct GNUNET_MQ_Handle * GNUNET_CLIENT_connect(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *service_name, const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_ErrorHandler error_handler, void *error_handler_cls)
Create a message queue to connect to a GNUnet service.
Definition: client.c:1060
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_reserve(struct GNUNET_DATASTORE_Handle *h, uint64_t amount, uint32_t entries, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Reserve space in the datastore.
void GNUNET_DATASTORE_disconnect(struct GNUNET_DATASTORE_Handle *h, int drop)
Disconnect from the datastore service (and free associated resources).
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_for_replication(struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a random value from the datastore for content replication.
void GNUNET_DATASTORE_cancel(struct GNUNET_DATASTORE_QueueEntry *qe)
Cancel a datastore operation.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_put(struct GNUNET_DATASTORE_Handle *h, uint32_t rid, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Store an item in the datastore.
void(* GNUNET_DATASTORE_DatumProcessor)(void *cls, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid)
Process a datum that was stored in the datastore.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve(struct GNUNET_DATASTORE_Handle *h, uint32_t rid, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Signal that all of the data for which a reservation was made has been stored and that whatever excess...
struct GNUNET_DATASTORE_Handle * GNUNET_DATASTORE_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the datastore service.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_zero_anonymity(struct GNUNET_DATASTORE_Handle *h, uint64_t next_uid, unsigned int queue_priority, unsigned int max_queue_size, enum GNUNET_BLOCK_Type type, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a single zero-anonymity value from the datastore.
void(* GNUNET_DATASTORE_ContinuationWithStatus)(void *cls, int32_t success, struct GNUNET_TIME_Absolute min_expiration, const char *msg)
Continuation called to notify client about result of the operation.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_remove(struct GNUNET_DATASTORE_Handle *h, const struct GNUNET_HashCode *key, size_t size, const void *data, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Explicitly remove some content from the database.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_key(struct GNUNET_DATASTORE_Handle *h, uint64_t next_uid, bool random, const struct GNUNET_HashCode *key, enum GNUNET_BLOCK_Type type, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a result for a particular key from the datastore.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_CONTAINER_DLL_insert_after(head, tail, other, element)
Insert an element into a DLL after the given other element.
#define GNUNET_log(kind,...)
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:54
void * cls
Closure for mv and cb.
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:37
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
@ GNUNET_OK
@ GNUNET_YES
@ GNUNET_NO
@ GNUNET_SYSERR
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur.
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_DEBUG
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_free(ptr)
Wrapper around free.
GNUNET_MQ_Error
Error codes for the queue.
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:305
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *mqm)
Discard the message queue message, free all allocated resources.
Definition: mq.c:285
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct.
Definition: gnunet_mq_lib.h:61
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:76
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
void GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Call a callback once the envelope has been sent, that is, sending it can not be canceled anymore.
Definition: mq.c:655
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
void GNUNET_MQ_destroy(struct GNUNET_MQ_Handle *mq)
Destroy the message queue.
Definition: mq.c:700
#define GNUNET_MESSAGE_TYPE_DATASTORE_DROP
Message sent by datastore client to drop the database.
#define GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE
Message sent by datastore client on join.
#define GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END
Message sent by datastore to client signaling end of matching data.
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY
Message sent by datastore client to get data by key.
#define GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE
Message sent by datastore client on join.
#define GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE
Message sent by datastore client to remove data.
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET
Message sent by datastore client to get data.
#define GNUNET_MESSAGE_TYPE_DATASTORE_DATA
Message sent by datastore to client providing requested data (in response to GET or GET_RANDOM reques...
#define GNUNET_MESSAGE_TYPE_DATASTORE_PUT
Message sent by datastore client to store data.
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION
Message sent by datastore client to get random data.
#define GNUNET_MESSAGE_TYPE_DATASTORE_STATUS
Message sent by datastore to client informing about status processing a request (in response to RESER...
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY
Message sent by datastore client to get random data.
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:980
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1277
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_remaining(struct GNUNET_TIME_Absolute future)
Given a timestamp in the future, how much time remains until then?
Definition: time.c:406
const char * GNUNET_STRINGS_relative_time_to_string(struct GNUNET_TIME_Relative delta, int do_round)
Give relative time in human-readable fancy format.
Definition: strings.c:579
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_ntoh(struct GNUNET_TIME_AbsoluteNBO a)
Convert absolute time from network byte order.
Definition: time.c:741
#define GNUNET_TIME_UNIT_ZERO
Relative time zero.
#define GNUNET_TIME_UNIT_ZERO_ABS
Absolute time zero.
struct GNUNET_TIME_AbsoluteNBO GNUNET_TIME_absolute_hton(struct GNUNET_TIME_Absolute a)
Convert absolute time to network byte order.
Definition: time.c:640
#define GNUNET_TIME_STD_BACKOFF(r)
Perform our standard exponential back-off calculation, starting at 1 ms and then going by a factor of...
static unsigned int size
Size of the "table".
Definition: peer.c:68
#define _(String)
GNU gettext support macro.
Definition: platform.h:178
GNUNET_BLOCK_Type
WARNING: This header is generated! In order to add DHT block types, you must register them in GANA,...
@ GNUNET_BLOCK_TYPE_ANY
Identifier for any block.
Message transmitting content from or to the datastore service.
Definition: datastore.h:192
uint32_t priority
Priority of the item (NBO), zero for remove.
Definition: datastore.h:219
struct GNUNET_HashCode key
Key under which the item can be found.
Definition: datastore.h:252
uint64_t uid
Unique ID for the content (can be used for UPDATE); can be zero for remove (which indicates that the ...
Definition: datastore.h:242
struct GNUNET_TIME_AbsoluteNBO expiration
Expiration time (NBO); zero for remove.
Definition: datastore.h:247
struct GNUNET_MessageHeader header
Type is either GNUNET_MESSAGE_TYPE_DATASTORE_PUT, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE or GNUNET_MESS...
Definition: datastore.h:199
uint32_t type
Type of the item (NBO), zero for remove, (actually an enum GNUNET_BLOCK_Type)
Definition: datastore.h:214
uint32_t size
Number of bytes in the item (NBO).
Definition: datastore.h:209
uint32_t replication
Desired replication level.
Definition: datastore.h:229
uint32_t anonymity
Desired anonymity level (NBO), zero for remove.
Definition: datastore.h:224
uint32_t rid
Reservation ID to use; use zero for none.
Definition: datastore.h:204
struct GNUNET_MQ_Handle * mq
Our connection to the ARM service.
Definition: arm_api.c:107
const struct GNUNET_CONFIGURATION_Handle * cfg
The configuration that we are using.
Definition: arm_api.c:112
struct GNUNET_SCHEDULER_Task * reconnect_task
ID of the reconnect task (if any).
Definition: arm_api.c:147
Handle to the datastore service.
unsigned int queue_size
Number of entries in the queue.
unsigned int skip_next_messages
We should ignore the next message(s) from the service.
struct GNUNET_TIME_Relative retry_time
How quickly should we retry? Used for exponential back-off on connect-errors.
struct GNUNET_DATASTORE_QueueEntry * queue_head
Current head of priority queue.
struct GNUNET_SCHEDULER_Task * reconnect_task
Task for trying to reconnect.
unsigned int result_count
Number of results we're receiving for the current query after application stopped to care.
struct GNUNET_STATISTICS_Handle * stats
Handle for statistics.
const struct GNUNET_CONFIGURATION_Handle * cfg
Our configuration.
struct GNUNET_MQ_Handle * mq
Current connection to the datastore service.
struct GNUNET_DATASTORE_QueueEntry * queue_tail
Current tail of priority queue.
Entry in our priority queue.
union QueueContext qc
Context for the operation.
struct GNUNET_DATASTORE_QueueEntry * prev
This is a linked list.
uint16_t response_type
Expected response type.
unsigned int priority
Priority in the queue.
struct GNUNET_DATASTORE_Handle * h
Handle to the master context.
unsigned int max_queue
Maximum allowed length of queue (otherwise this request should be discarded).
struct GNUNET_MQ_Envelope * env
Envelope of the request to transmit, NULL after transmission.
struct GNUNET_DATASTORE_QueueEntry * next
This is a linked list.
GNUNET_DATASTORE_ContinuationWithStatus cont
Function to call after transmission of the request.
void * cont_cls
Closure for cont.
struct GNUNET_SCHEDULER_Task * delay_warn_task
Task we run if this entry stalls the queue and we need to warn the user.
A 512-bit hashcode.
Handle to a message queue.
Definition: mq.c:87
Message handler for a specific message type.
Header for all communications.
Entry in list of pending tasks.
Definition: scheduler.c:136
Handle for the service.
Time for absolute times used by GNUnet, in microseconds.
Time for relative time used by GNUnet, in microseconds.
Message to the datastore service asking about specific content.
Definition: datastore.h:108
uint32_t type
Desired content type.
Definition: datastore.h:117
uint64_t next_uid
UID at which to start the search.
Definition: datastore.h:122
struct GNUNET_HashCode key
Desired key.
Definition: datastore.h:132
uint32_t random
If true return a random result.
Definition: datastore.h:127
Message to the datastore service asking about specific content.
Definition: datastore.h:141
uint32_t random
If true return a random result.
Definition: datastore.h:160
uint32_t type
Desired content type.
Definition: datastore.h:150
uint64_t next_uid
UID at which to start the search.
Definition: datastore.h:155
Message to the datastore service asking about zero anonymity content.
Definition: datastore.h:169
Message from datastore client informing service that the remainder of the reserved bytes can now be r...
Definition: datastore.h:90
int32_t rid
Reservation id.
Definition: datastore.h:99
Message from datastore service informing client about the current size of the datastore.
Definition: datastore.h:40
uint32_t entries
Number of items to reserve.
Definition: datastore.h:49
uint64_t amount
Number of bytes to reserve.
Definition: datastore.h:54
Context for processing result messages.
Definition: datastore_api.c:72
GNUNET_DATASTORE_DatumProcessor proc
Function to call with the result.
Definition: datastore_api.c:76
void * proc_cls
Closure for proc.
Definition: datastore_api.c:81
Context for processing status messages.
Definition: datastore_api.c:55
void * cont_cls
Closure for cont.
Definition: datastore_api.c:64
GNUNET_DATASTORE_ContinuationWithStatus cont
Continuation to call with the status.
Definition: datastore_api.c:59
Message from datastore service informing client about the success or failure of a requested operation...
Definition: datastore.h:65
struct GNUNET_TIME_AbsoluteNBO min_expiration
Minimum expiration time required for content to be stored by the datacache at this time,...
Definition: datastore.h:80
int32_t status
Status code, -1 for errors.
Definition: datastore.h:74
struct GNUNET_MessageHeader header
Type is GNUNET_MESSAGE_TYPE_DATASTORE_STATUS.
Definition: datastore.h:69
Context for a queue operation.
Definition: datastore_api.c:89
struct ResultContext rc
Definition: datastore_api.c:92
struct StatusContext sc
Definition: datastore_api.c:90