GNUnet  0.11.x
datastore_api.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2004-2013, 2016 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
32 #include "datastore.h"
33 
34 #define LOG(kind, ...) GNUNET_log_from (kind, "datastore-api", __VA_ARGS__)
35 
36 #define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37 
41 #define INSANE_STATISTICS GNUNET_NO
42 
49 #define MAX_EXCESS_RESULTS 8
50 
55 {
60 
64  void *cont_cls;
65 };
66 
67 
72 {
77 
81  void *proc_cls;
82 };
83 
84 
89 {
90  struct StatusContext sc;
91 
92  struct ResultContext rc;
93 };
94 
95 
100 {
105 
110 
115 
120 
124  void *cont_cls;
125 
129  union QueueContext qc;
130 
136 
142 
146  unsigned int priority;
147 
152  unsigned int max_queue;
153 
157  uint16_t response_type;
158 };
159 
160 
165 {
170 
175 
180 
185 
190 
195 
200  struct GNUNET_TIME_Relative retry_time;
201 
205  unsigned int queue_size;
206 
212  unsigned int result_count;
213 
217  unsigned int skip_next_messages;
218 };
219 
220 
226 static void
227 try_reconnect (void *cls);
228 
229 
236 static void
238 {
239  if (NULL == h->mq)
240  {
241  GNUNET_break (0);
242  return;
243  }
244  GNUNET_MQ_destroy (h->mq);
245  h->mq = NULL;
246  h->skip_next_messages = 0;
247  h->reconnect_task
249  &try_reconnect,
250  h);
251 }
252 
253 
261 static void
263 {
264  struct GNUNET_DATASTORE_Handle *h = qe->h;
265 
267  h->queue_tail,
268  qe);
269  h->queue_size--;
270  if (NULL != qe->env)
271  GNUNET_MQ_discard (qe->env);
272  if (NULL != qe->delay_warn_task)
274  GNUNET_free (qe);
275 }
276 
277 
283 static void
284 delay_warning (void *cls)
285 {
286  struct GNUNET_DATASTORE_QueueEntry *qe = cls;
287 
288  qe->delay_warn_task = NULL;
290  "Request %p of type %u at head of datastore queue for more than %s\n",
291  qe,
292  (unsigned int) qe->response_type,
294  GNUNET_YES));
296  &delay_warning,
297  qe);
298 }
299 
300 
307 static void
308 mq_error_handler (void *cls,
309  enum GNUNET_MQ_Error error)
310 {
311  struct GNUNET_DATASTORE_Handle *h = cls;
313 
315  "MQ error, reconnecting to DATASTORE\n");
316  do_disconnect (h);
317  qe = h->queue_head;
318  if (NULL == qe)
319  return;
320  if (NULL != qe->delay_warn_task)
321  {
323  qe->delay_warn_task = NULL;
324  }
325  if (NULL == qe->env)
326  {
327  union QueueContext qc = qe->qc;
328  uint16_t rt = qe->response_type;
329 
331  "Failed to receive response from database.\n");
332  free_queue_entry (qe);
333  switch (rt)
334  {
336  if (NULL != qc.sc.cont)
337  qc.sc.cont (qc.sc.cont_cls,
340  _ ("DATASTORE disconnected"));
341  break;
342 
344  if (NULL != qc.rc.proc)
345  qc.rc.proc (qc.rc.proc_cls,
346  NULL,
347  0,
348  NULL,
349  0,
350  0,
351  0,
352  0,
354  0);
355  break;
356 
357  default:
358  GNUNET_break (0);
359  }
360  }
361 }
362 
363 
372 {
373  struct GNUNET_DATASTORE_Handle *h;
374 
376  "Establishing DATASTORE connection!\n");
377  h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
378  h->cfg = cfg;
379  try_reconnect (h);
380  if (NULL == h->mq)
381  {
382  GNUNET_free (h);
383  return NULL;
384  }
385  h->stats = GNUNET_STATISTICS_create ("datastore-api",
386  cfg);
387  return h;
388 }
389 
390 
397 static void
399 {
400  struct GNUNET_DATASTORE_Handle *h = cls;
401 
403  "Drop sent, disconnecting\n");
405  GNUNET_NO);
406 }
407 
408 
415 static void
417  enum GNUNET_MQ_Error error)
418 {
419  struct GNUNET_DATASTORE_Handle *h = cls;
420 
422  "Failed to ask datastore to drop tables\n");
424  GNUNET_NO);
425 }
426 
427 
435 void
437  int drop)
438 {
440 
442  "Datastore disconnect\n");
443  if (NULL != h->mq)
444  {
445  GNUNET_MQ_destroy (h->mq);
446  h->mq = NULL;
447  }
448  if (NULL != h->reconnect_task)
449  {
451  h->reconnect_task = NULL;
452  }
453  while (NULL != (qe = h->queue_head))
454  {
455  switch (qe->response_type)
456  {
458  if (NULL != qe->qc.sc.cont)
459  qe->qc.sc.cont (qe->qc.sc.cont_cls,
462  _ ("Disconnected from DATASTORE"));
463  break;
464 
466  if (NULL != qe->qc.rc.proc)
467  qe->qc.rc.proc (qe->qc.rc.proc_cls,
468  NULL,
469  0,
470  NULL,
471  0,
472  0,
473  0,
474  0,
476  0);
477  break;
478 
479  default:
480  GNUNET_break (0);
481  }
482  free_queue_entry (qe);
483  }
484  if (GNUNET_YES == drop)
485  {
487  "Re-connecting to issue DROP!\n");
488  GNUNET_assert (NULL == h->mq);
489  h->mq = GNUNET_CLIENT_connect (h->cfg,
490  "datastore",
491  NULL,
493  h);
494  if (NULL != h->mq)
495  {
496  struct GNUNET_MessageHeader *hdr;
497  struct GNUNET_MQ_Envelope *env;
498 
499  env = GNUNET_MQ_msg (hdr,
503  h);
504  GNUNET_MQ_send (h->mq,
505  env);
506  return;
507  }
508  GNUNET_break (0);
509  }
511  GNUNET_NO);
512  h->stats = NULL;
513  GNUNET_free (h);
514 }
515 
516 
532 static struct GNUNET_DATASTORE_QueueEntry *
534  struct GNUNET_MQ_Envelope *env,
535  unsigned int queue_priority,
536  unsigned int max_queue_size,
537  uint16_t expected_type,
538  const union QueueContext *qc)
539 {
541  struct GNUNET_DATASTORE_QueueEntry *pos;
542  unsigned int c;
543 
544  if ((NULL != h->queue_tail) &&
545  (h->queue_tail->priority >= queue_priority))
546  {
547  c = h->queue_size;
548  pos = NULL;
549  }
550  else
551  {
552  c = 0;
553  pos = h->queue_head;
554  }
555  while ((NULL != pos) &&
556  (c < max_queue_size) &&
557  (pos->priority >= queue_priority))
558  {
559  c++;
560  pos = pos->next;
561  }
562  if (c >= max_queue_size)
563  {
565  gettext_noop ("# queue overflows"),
566  1,
567  GNUNET_NO);
568  GNUNET_MQ_discard (env);
569  return NULL;
570  }
572  qe->h = h;
573  qe->env = env;
574  qe->response_type = expected_type;
575  qe->qc = *qc;
576  qe->priority = queue_priority;
577  qe->max_queue = max_queue_size;
578  if (NULL == pos)
579  {
580  /* append at the tail */
581  pos = h->queue_tail;
582  }
583  else
584  {
585  pos = pos->prev;
586  /* do not insert at HEAD if HEAD query was already
587  * transmitted and we are still receiving replies! */
588  if ((NULL == pos) &&
589  (NULL == h->queue_head->env))
590  pos = h->queue_head;
591  }
592  c++;
593 #if INSANE_STATISTICS
595  gettext_noop ("# queue entries created"),
596  1,
597  GNUNET_NO);
598 #endif
600  h->queue_tail,
601  pos,
602  qe);
603  h->queue_size++;
604  return qe;
605 }
606 
607 
614 static void
616 {
618 
619  if (NULL == (qe = h->queue_head))
620  {
621  /* no entry in queue */
623  "Queue empty\n");
624  return;
625  }
626  if (NULL == qe->env)
627  {
628  /* waiting for replies */
630  "Head request already transmitted\n");
631  return;
632  }
633  if (NULL == h->mq)
634  {
635  /* waiting for reconnect */
637  "Not connected\n");
638  return;
639  }
640  GNUNET_assert (NULL == qe->delay_warn_task);
642  &delay_warning,
643  qe);
644  GNUNET_MQ_send (h->mq,
645  qe->env);
646  qe->env = NULL;
647 }
648 
649 
657 static struct GNUNET_DATASTORE_QueueEntry *
659  uint16_t response_type)
660 {
662 
663  if (h->skip_next_messages > 0)
664  {
665  h->skip_next_messages--;
666  process_queue (h);
667  return NULL;
668  }
669  qe = h->queue_head;
670  if (NULL == qe)
671  {
672  GNUNET_break (0);
673  do_disconnect (h);
674  return NULL;
675  }
676  if (NULL != qe->env)
677  {
678  GNUNET_break (0);
679  do_disconnect (h);
680  return NULL;
681  }
682  if (response_type != qe->response_type)
683  {
684  GNUNET_break (0);
685  do_disconnect (h);
686  return NULL;
687  }
688  return qe;
689 }
690 
691 
699 static int
700 check_status (void *cls,
701  const struct StatusMessage *sm)
702 {
703  uint16_t msize = ntohs (sm->header.size) - sizeof(*sm);
704  int32_t status = ntohl (sm->status);
705 
706  if (msize > 0)
707  {
708  const char *emsg = (const char *) &sm[1];
709 
710  if ('\0' != emsg[msize - 1])
711  {
712  GNUNET_break (0);
713  return GNUNET_SYSERR;
714  }
715  }
716  else if (GNUNET_SYSERR == status)
717  {
718  GNUNET_break (0);
719  return GNUNET_SYSERR;
720  }
721  return GNUNET_OK;
722 }
723 
724 
731 static void
732 handle_status (void *cls,
733  const struct StatusMessage *sm)
734 {
735  struct GNUNET_DATASTORE_Handle *h = cls;
737  struct StatusContext rc;
738  const char *emsg;
739  int32_t status = ntohl (sm->status);
740 
741  qe = get_queue_head (h,
743  if (NULL == qe)
744  return;
745  rc = qe->qc.sc;
746  free_queue_entry (qe);
747  if (ntohs (sm->header.size) > sizeof(struct StatusMessage))
748  emsg = (const char *) &sm[1];
749  else
750  emsg = NULL;
752  "Received status %d/%s\n",
753  (int) status,
754  emsg);
756  gettext_noop ("# status messages received"),
757  1,
758  GNUNET_NO);
760  process_queue (h);
761  if (NULL != rc.cont)
762  rc.cont (rc.cont_cls,
763  status,
765  emsg);
766 }
767 
768 
775 static int
776 check_data (void *cls,
777  const struct DataMessage *dm)
778 {
779  uint16_t msize = ntohs (dm->header.size) - sizeof(*dm);
780 
781  if (msize != ntohl (dm->size))
782  {
783  GNUNET_break (0);
784  return GNUNET_SYSERR;
785  }
786  return GNUNET_OK;
787 }
788 
789 
796 static void
797 handle_data (void *cls,
798  const struct DataMessage *dm)
799 {
800  struct GNUNET_DATASTORE_Handle *h = cls;
802  struct ResultContext rc;
803 
804  qe = get_queue_head (h,
806  if (NULL == qe)
807  return;
808 #if INSANE_STATISTICS
810  gettext_noop ("# Results received"),
811  1,
812  GNUNET_NO);
813 #endif
815  "Received result %llu with type %u and size %u with key %s\n",
816  (unsigned long long) GNUNET_ntohll (dm->uid),
817  ntohl (dm->type),
818  ntohl (dm->size),
819  GNUNET_h2s (&dm->key));
820  rc = qe->qc.rc;
821  free_queue_entry (qe);
823  process_queue (h);
824  if (NULL != rc.proc)
825  rc.proc (rc.proc_cls,
826  &dm->key,
827  ntohl (dm->size),
828  &dm[1],
829  ntohl (dm->type),
830  ntohl (dm->priority),
831  ntohl (dm->anonymity),
832  ntohl (dm->replication),
834  GNUNET_ntohll (dm->uid));
835 }
836 
837 
845 static void
846 handle_data_end (void *cls,
847  const struct GNUNET_MessageHeader *msg)
848 {
849  struct GNUNET_DATASTORE_Handle *h = cls;
851  struct ResultContext rc;
852 
853  qe = get_queue_head (h,
855  if (NULL == qe)
856  return;
857  rc = qe->qc.rc;
858  free_queue_entry (qe);
860  "Received end of result set, new queue size is %u\n",
861  h->queue_size);
863  h->result_count = 0;
864  process_queue (h);
865  /* signal end of iteration */
866  if (NULL != rc.proc)
867  rc.proc (rc.proc_cls,
868  NULL,
869  0,
870  NULL,
871  0,
872  0,
873  0,
874  0,
876  0);
877 }
878 
879 
885 static void
886 try_reconnect (void *cls)
887 {
888  struct GNUNET_DATASTORE_Handle *h = cls;
889  struct GNUNET_MQ_MessageHandler handlers[] = {
892  struct StatusMessage,
893  h),
896  struct DataMessage,
897  h),
898  GNUNET_MQ_hd_fixed_size (data_end,
900  struct GNUNET_MessageHeader,
901  h),
903  };
904 
906  h->reconnect_task = NULL;
907  GNUNET_assert (NULL == h->mq);
908  h->mq = GNUNET_CLIENT_connect (h->cfg,
909  "datastore",
910  handlers,
912  h);
913  if (NULL == h->mq)
914  return;
916  gettext_noop (
917  "# datastore connections (re)created"),
918  1,
919  GNUNET_NO);
921  "Reconnected to DATASTORE\n");
922  process_queue (h);
923 }
924 
925 
934 static void
936  int32_t result,
938  const char *emsg)
939 {
940  /* do nothing */
941 }
942 
943 
971  uint32_t rid,
972  const struct GNUNET_HashCode *key,
973  size_t size,
974  const void *data,
975  enum GNUNET_BLOCK_Type type,
976  uint32_t priority,
977  uint32_t anonymity,
978  uint32_t replication,
980  unsigned int queue_priority,
981  unsigned int max_queue_size,
983  void *cont_cls)
984 {
986  struct GNUNET_MQ_Envelope *env;
987  struct DataMessage *dm;
988  union QueueContext qc;
989 
990  if (size + sizeof(*dm) >= GNUNET_MAX_MESSAGE_SIZE)
991  {
992  GNUNET_break (0);
993  return NULL;
994  }
995 
997  "Asked to put %u bytes of data under key `%s' for %s\n",
998  size,
999  GNUNET_h2s (key),
1002  GNUNET_YES));
1003  env = GNUNET_MQ_msg_extra (dm,
1004  size,
1006  dm->rid = htonl (rid);
1007  dm->size = htonl ((uint32_t) size);
1008  dm->type = htonl (type);
1009  dm->priority = htonl (priority);
1010  dm->anonymity = htonl (anonymity);
1011  dm->replication = htonl (replication);
1012  dm->expiration = GNUNET_TIME_absolute_hton (expiration);
1013  dm->key = *key;
1014  GNUNET_memcpy (&dm[1],
1015  data,
1016  size);
1017  qc.sc.cont = cont;
1018  qc.sc.cont_cls = cont_cls;
1019  qe = make_queue_entry (h,
1020  env,
1021  queue_priority,
1022  max_queue_size,
1024  &qc);
1025  if (NULL == qe)
1026  {
1028  "Could not create queue entry for PUT\n");
1029  return NULL;
1030  }
1032  gettext_noop ("# PUT requests executed"),
1033  1,
1034  GNUNET_NO);
1035  process_queue (h);
1036  return qe;
1037 }
1038 
1039 
1057  uint64_t amount,
1058  uint32_t entries,
1060  void *cont_cls)
1061 {
1063  struct GNUNET_MQ_Envelope *env;
1064  struct ReserveMessage *rm;
1065  union QueueContext qc;
1066 
1067  if (NULL == cont)
1068  cont = &drop_status_cont;
1070  "Asked to reserve %llu bytes of data and %u entries\n",
1071  (unsigned long long) amount,
1072  (unsigned int) entries);
1073  env = GNUNET_MQ_msg (rm,
1075  rm->entries = htonl (entries);
1076  rm->amount = GNUNET_htonll (amount);
1077 
1078  qc.sc.cont = cont;
1079  qc.sc.cont_cls = cont_cls;
1080  qe = make_queue_entry (h,
1081  env,
1082  UINT_MAX,
1083  UINT_MAX,
1085  &qc);
1086  if (NULL == qe)
1087  {
1089  "Could not create queue entry to reserve\n");
1090  return NULL;
1091  }
1093  gettext_noop ("# RESERVE requests executed"),
1094  1,
1095  GNUNET_NO);
1096  process_queue (h);
1097  return qe;
1098 }
1099 
1100 
1123  uint32_t rid,
1124  unsigned int queue_priority,
1125  unsigned int max_queue_size,
1127  void *cont_cls)
1128 {
1130  struct GNUNET_MQ_Envelope *env;
1131  struct ReleaseReserveMessage *rrm;
1132  union QueueContext qc;
1133 
1134  if (NULL == cont)
1135  cont = &drop_status_cont;
1137  "Asked to release reserve %d\n",
1138  rid);
1139  env = GNUNET_MQ_msg (rrm,
1141  rrm->rid = htonl (rid);
1142  qc.sc.cont = cont;
1143  qc.sc.cont_cls = cont_cls;
1144  qe = make_queue_entry (h,
1145  env,
1146  queue_priority,
1147  max_queue_size,
1149  &qc);
1150  if (NULL == qe)
1151  {
1153  "Could not create queue entry to release reserve\n");
1154  return NULL;
1155  }
1157  gettext_noop
1158  ("# RELEASE RESERVE requests executed"), 1,
1159  GNUNET_NO);
1160  process_queue (h);
1161  return qe;
1162 }
1163 
1164 
1187  const struct GNUNET_HashCode *key,
1188  size_t size,
1189  const void *data,
1190  unsigned int queue_priority,
1191  unsigned int max_queue_size,
1193  void *cont_cls)
1194 {
1196  struct DataMessage *dm;
1197  struct GNUNET_MQ_Envelope *env;
1198  union QueueContext qc;
1199 
1200  if (sizeof(*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1201  {
1202  GNUNET_break (0);
1203  return NULL;
1204  }
1205  if (NULL == cont)
1206  cont = &drop_status_cont;
1208  "Asked to remove %u bytes under key `%s'\n",
1209  size,
1210  GNUNET_h2s (key));
1211  env = GNUNET_MQ_msg_extra (dm,
1212  size,
1214  dm->size = htonl (size);
1215  dm->key = *key;
1216  GNUNET_memcpy (&dm[1],
1217  data,
1218  size);
1219 
1220  qc.sc.cont = cont;
1221  qc.sc.cont_cls = cont_cls;
1222 
1223  qe = make_queue_entry (h,
1224  env,
1225  queue_priority,
1226  max_queue_size,
1228  &qc);
1229  if (NULL == qe)
1230  {
1232  "Could not create queue entry for REMOVE\n");
1233  return NULL;
1234  }
1236  gettext_noop ("# REMOVE requests executed"),
1237  1,
1238  GNUNET_NO);
1239  process_queue (h);
1240  return qe;
1241 }
1242 
1243 
1264  unsigned int queue_priority,
1265  unsigned int max_queue_size,
1267  void *proc_cls)
1268 {
1270  struct GNUNET_MQ_Envelope *env;
1271  struct GNUNET_MessageHeader *m;
1272  union QueueContext qc;
1273 
1274  GNUNET_assert (NULL != proc);
1276  "Asked to get replication entry\n");
1277  env = GNUNET_MQ_msg (m,
1279  qc.rc.proc = proc;
1280  qc.rc.proc_cls = proc_cls;
1281  qe = make_queue_entry (h,
1282  env,
1283  queue_priority,
1284  max_queue_size,
1286  &qc);
1287  if (NULL == qe)
1288  {
1290  "Could not create queue entry for GET REPLICATION\n");
1291  return NULL;
1292  }
1294  gettext_noop
1295  ("# GET REPLICATION requests executed"), 1,
1296  GNUNET_NO);
1297  process_queue (h);
1298  return qe;
1299 }
1300 
1301 
1320  uint64_t next_uid,
1321  unsigned int queue_priority,
1322  unsigned int max_queue_size,
1323  enum GNUNET_BLOCK_Type type,
1325  void *proc_cls)
1326 {
1328  struct GNUNET_MQ_Envelope *env;
1329  struct GetZeroAnonymityMessage *m;
1330  union QueueContext qc;
1331 
1332  GNUNET_assert (NULL != proc);
1335  "Asked to get a zero-anonymity entry of type %d\n",
1336  type);
1337  env = GNUNET_MQ_msg (m,
1339  m->type = htonl ((uint32_t) type);
1340  m->next_uid = GNUNET_htonll (next_uid);
1341  qc.rc.proc = proc;
1342  qc.rc.proc_cls = proc_cls;
1343  qe = make_queue_entry (h,
1344  env,
1345  queue_priority,
1346  max_queue_size,
1348  &qc);
1349  if (NULL == qe)
1350  {
1352  "Could not create queue entry for zero-anonymity procation\n");
1353  return NULL;
1354  }
1356  gettext_noop
1357  ("# GET ZERO ANONYMITY requests executed"), 1,
1358  GNUNET_NO);
1359  process_queue (h);
1360  return qe;
1361 }
1362 
1363 
1384  uint64_t next_uid,
1385  bool random,
1386  const struct GNUNET_HashCode *key,
1387  enum GNUNET_BLOCK_Type type,
1388  unsigned int queue_priority,
1389  unsigned int max_queue_size,
1391  void *proc_cls)
1392 {
1394  struct GNUNET_MQ_Envelope *env;
1395  struct GetKeyMessage *gkm;
1396  struct GetMessage *gm;
1397  union QueueContext qc;
1398 
1399  GNUNET_assert (NULL != proc);
1401  "Asked to look for data of type %u under key `%s'\n",
1402  (unsigned int) type,
1403  GNUNET_h2s (key));
1404  if (NULL == key)
1405  {
1406  env = GNUNET_MQ_msg (gm,
1408  gm->type = htonl (type);
1409  gm->next_uid = GNUNET_htonll (next_uid);
1410  gm->random = random;
1411  }
1412  else
1413  {
1414  env = GNUNET_MQ_msg (gkm,
1416  gkm->type = htonl (type);
1417  gkm->next_uid = GNUNET_htonll (next_uid);
1418  gkm->random = random;
1419  gkm->key = *key;
1420  }
1421  qc.rc.proc = proc;
1422  qc.rc.proc_cls = proc_cls;
1423  qe = make_queue_entry (h,
1424  env,
1425  queue_priority,
1426  max_queue_size,
1428  &qc);
1429  if (NULL == qe)
1430  {
1432  "Could not queue request for `%s'\n",
1433  GNUNET_h2s (key));
1434  return NULL;
1435  }
1436 #if INSANE_STATISTICS
1438  gettext_noop ("# GET requests executed"),
1439  1,
1440  GNUNET_NO);
1441 #endif
1442  process_queue (h);
1443  return qe;
1444 }
1445 
1446 
1453 void
1455 {
1456  struct GNUNET_DATASTORE_Handle *h = qe->h;
1457 
1459  "Pending DATASTORE request %p cancelled (%d, %d)\n",
1460  qe,
1461  NULL == qe->env,
1462  h->queue_head == qe);
1463  if (NULL == qe->env)
1464  {
1465  free_queue_entry (qe);
1466  h->skip_next_messages++;
1467  return;
1468  }
1469  free_queue_entry (qe);
1470  process_queue (h);
1471 }
1472 
1473 
1474 /* end of datastore_api.c */
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_MESSAGE_TYPE_DATASTORE_STATUS
Message sent by datastore to client informing about status processing a request (in response to RESER...
static struct GNUNET_TIME_Absolute min_expiration
Minimum time that content should have to not be discarded instantly (time stamp of any content that w...
unsigned int queue_size
Number of entries in the queue.
static struct GNUNET_DATASTORE_QueueEntry * qe
Current operation.
uint32_t random
If true return a random result.
Definition: datastore.h:127
struct GNUNET_MQ_Handle * mq
Current connection to the datastore service.
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_ntoh(struct GNUNET_TIME_AbsoluteNBO a)
Convert absolute time from network byte order.
Definition: time.c:673
#define GNUNET_TIME_UNIT_ZERO_ABS
Absolute time zero.
static const struct GNUNET_CONFIGURATION_Handle * cfg
Configuration we are using.
Definition: gnunet-abd.c:36
struct GNUNET_TIME_AbsoluteNBO expiration
Expiration time (NBO); zero for remove.
Definition: datastore.h:247
uint32_t type
Desired content type (actually an enum GNUNET_BLOCK_Type)
Definition: datastore.h:178
uint32_t entries
Number of items to reserve.
Definition: datastore.h:49
Any type of block, used as a wildcard when searching.
Message from datastore service informing client about the current size of the datastore.
Definition: datastore.h:39
struct GNUNET_DATASTORE_QueueEntry * prev
This is a linked list.
struct GNUNET_MQ_Handle * GNUNET_CLIENT_connect(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *service_name, const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_ErrorHandler error_handler, void *error_handler_cls)
Create a message queue to connect to a GNUnet service.
Definition: client.c:1057
struct GNUNET_DATASTORE_QueueEntry * queue_tail
Current tail of priority queue.
struct GNUNET_DATASTORE_Handle * h
Handle to the master context.
GNUNET_BLOCK_Type
Blocks in the datastore and the datacache must have a unique type.
static void try_reconnect(void *cls)
Try reconnecting to the datastore service.
uint32_t type
Type of the item (NBO), zero for remove, (actually an enum GNUNET_BLOCK_Type)
Definition: datastore.h:214
GNUNET_MQ_Error
Error codes for the queue.
static void process_queue(struct GNUNET_DATASTORE_Handle *h)
Process entries in the queue (or do nothing if we are already doing so).
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_put(struct GNUNET_DATASTORE_Handle *h, uint32_t rid, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Store an item in the datastore.
struct GNUNET_STATISTICS_Handle * stats
Handle for statistics.
struct GNUNET_MQ_Envelope * env
Envelope of the request to transmit, NULL after transmission.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
void * proc_cls
Closure for proc.
Definition: datastore_api.c:81
uint32_t random
If true return a random result.
Definition: datastore.h:160
static void delay_warning(void *cls)
Task that logs an error after some time.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_reserve(struct GNUNET_DATASTORE_Handle *h, uint64_t amount, uint32_t entries, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Reserve space in the datastore.
unsigned int result_count
Number of results we&#39;re receiving for the current query after application stopped to care...
Message from datastore client informing service that the remainder of the reserved bytes can now be r...
Definition: datastore.h:89
static unsigned int replication
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
GNUNET_DATASTORE_ContinuationWithStatus cont
Function to call after transmission of the request.
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
struct GNUNET_SCHEDULER_Task * reconnect_task
Task for trying to reconnect.
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:67
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION
Message sent by datastore client to get random data.
Message to the datastore service asking about specific content.
Definition: datastore.h:107
struct GNUNET_TIME_AbsoluteNBO min_expiration
Minimum expiration time required for content to be stored by the datacache at this time...
Definition: datastore.h:80
#define GNUNET_NO
Definition: gnunet_common.h:78
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:75
static void handle_data(void *cls, const struct DataMessage *dm)
Handle data message we got from the service.
Message to the datastore service asking about zero anonymity content.
Definition: datastore.h:168
#define GNUNET_new(type)
Allocate a struct or union of the given type.
static struct GNUNET_DATASTORE_QueueEntry * get_queue_head(struct GNUNET_DATASTORE_Handle *h, uint16_t response_type)
Get the entry at the head of the message queue.
static void handle_status(void *cls, const struct StatusMessage *sm)
Function called to handle status message from the service.
uint64_t next_uid
UID at which to start the search.
Definition: datastore.h:155
#define GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE
Message sent by datastore client on join.
#define GNUNET_MESSAGE_TYPE_DATASTORE_PUT
Message sent by datastore client to store data.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve(struct GNUNET_DATASTORE_Handle *h, uint32_t rid, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Signal that all of the data for which a reservation was made has been stored and that whatever excess...
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
Handle for the service.
const struct GNUNET_CONFIGURATION_Handle * cfg
Our configuration.
static void drop_status_cont(void *cls, int32_t result, struct GNUNET_TIME_Absolute min_expiration, const char *emsg)
Dummy continuation used to do nothing (but be non-zero).
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur...
static struct GNUNET_ARM_Handle * h
Connection with ARM.
Definition: gnunet-arm.c:99
#define _(String)
GNU gettext support macro.
Definition: platform.h:181
struct StatusContext sc
Definition: datastore_api.c:90
#define GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE
Message sent by datastore client on join.
static struct GNUNET_ARM_MonitorHandle * m
Monitor connection with ARM.
Definition: gnunet-arm.c:104
structs for communication between datastore service and API
GNUNET_DATASTORE_DatumProcessor proc
Function to call with the result.
Definition: datastore_api.c:76
struct ResultContext rc
Definition: datastore_api.c:92
unsigned int max_queue
Maximum allowed length of queue (otherwise this request should be discarded).
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct...
Definition: gnunet_mq_lib.h:52
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1253
static void free_queue_entry(struct GNUNET_DATASTORE_QueueEntry *qe)
Free a queue entry.
Entry in our priority queue.
Definition: datastore_api.c:99
#define GNUNET_MESSAGE_TYPE_DATASTORE_DROP
Message sent by datastore client to drop the database.
void * cls
Closure for mv and cb.
void GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Call a callback once the envelope has been sent, that is, sending it can not be canceled anymore...
Definition: mq.c:774
unsigned int skip_next_messages
We should ignore the next message(s) from the service.
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
Message to the datastore service asking about specific content.
Definition: datastore.h:140
uint32_t type
Desired content type.
Definition: datastore.h:117
struct ListEntry * entries
List of peers in the list.
GNUNET_DATASTORE_ContinuationWithStatus cont
Continuation to call with the status.
Definition: datastore_api.c:59
const char * GNUNET_STRINGS_relative_time_to_string(struct GNUNET_TIME_Relative delta, int do_round)
Give relative time in human-readable fancy format.
Definition: strings.c:687
void(* GNUNET_DATASTORE_DatumProcessor)(void *cls, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid)
Process a datum that was stored in the datastore.
uint16_t status
See PRISM_STATUS_*-constants.
static int result
Global testing status.
#define GNUNET_CONTAINER_DLL_insert_after(head, tail, other, element)
Insert an element into a DLL after the given other element.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_remove(struct GNUNET_DATASTORE_Handle *h, const struct GNUNET_HashCode *key, size_t size, const void *data, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Explicitly remove some content from the database.
static void disconnect_after_drop(void *cls)
Task used by to disconnect from the datastore after we send the GNUNET_MESSAGE_TYPE_DATASTORE_DROP me...
A 512-bit hashcode.
static char * expiration
Credential TTL.
Definition: gnunet-abd.c:96
Message handler for a specific message type.
uint32_t type
Desired content type.
Definition: datastore.h:150
#define LOG(kind,...)
Definition: datastore_api.c:34
#define GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END
Message sent by datastore to client signaling end of matching data.
uint32_t replication
Desired replication level.
Definition: datastore.h:229
static struct GNUNET_DATASTORE_QueueEntry * make_queue_entry(struct GNUNET_DATASTORE_Handle *h, struct GNUNET_MQ_Envelope *env, unsigned int queue_priority, unsigned int max_queue_size, uint16_t expected_type, const union QueueContext *qc)
Create a new entry for our priority queue (and possibly discard other entires if the queue is getting...
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *mqm)
Discard the message queue message, free all allocated resources.
Definition: mq.c:321
uint32_t rid
Reservation ID to use; use zero for none.
Definition: datastore.h:204
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:35
union QueueContext qc
Context for the operation.
struct GNUNET_SCHEDULER_Task * delay_warn_task
Task we run if this entry stalls the queue and we need to warn the user.
Message transmitting content from or to the datastore service.
Definition: datastore.h:191
struct GNUNET_HashCode key
Desired key.
Definition: datastore.h:132
struct GNUNET_HashCode key
The key used in the DHT.
#define GNUNET_SYSERR
Definition: gnunet_common.h:76
static unsigned int size
Size of the "table".
Definition: peer.c:67
Context for a queue operation.
Definition: datastore_api.c:88
uint32_t size
Number of bytes in the item (NBO).
Definition: datastore.h:209
uint32_t anonymity
Desired anonymity level (NBO), zero for remove.
Definition: datastore.h:224
void(* GNUNET_DATASTORE_ContinuationWithStatus)(void *cls, int32_t success, struct GNUNET_TIME_Absolute min_expiration, const char *msg)
Continuation called to notify client about result of the operation.
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
static struct GNUNET_FS_SearchContext * sc
Definition: gnunet-search.c:37
#define GNUNET_TIME_STD_BACKOFF(r)
Perform our standard exponential back-off calculation, starting at 1 ms and then going by a factor of...
int32_t rid
Reservation id.
Definition: datastore.h:99
Context for processing result messages.
Definition: datastore_api.c:71
#define GNUNET_TIME_UNIT_ZERO
Relative time zero.
#define GNUNET_MAX_MESSAGE_SIZE
Largest supported message (to be precise, one byte more than the largest possible message...
struct GNUNET_MessageHeader header
Type is either GNUNET_MESSAGE_TYPE_DATASTORE_PUT, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE or GNUNET_MESS...
Definition: datastore.h:199
static void disconnect_on_mq_error(void *cls, enum GNUNET_MQ_Error error)
Handle error in sending drop request to datastore.
Handle to a message queue.
Definition: mq.c:85
static int check_data(void *cls, const struct DataMessage *dm)
Check data message we received from the service.
void GNUNET_DATASTORE_cancel(struct GNUNET_DATASTORE_QueueEntry *qe)
Cancel a datastore operation.
uint64_t next_uid
UID at which to start the search.
Definition: datastore.h:183
uint64_t uid
Unique ID for the content (can be used for UPDATE); can be zero for remove (which indicates that the ...
Definition: datastore.h:242
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY
Message sent by datastore client to get random data.
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY
Message sent by datastore client to get data by key.
configuration data
Definition: configuration.c:85
void GNUNET_DATASTORE_disconnect(struct GNUNET_DATASTORE_Handle *h, int drop)
Disconnect from the datastore service (and free associated resources).
struct GNUNET_HashCode key
Key under which the item can be found.
Definition: datastore.h:252
static void do_disconnect(struct GNUNET_DATASTORE_Handle *h)
Disconnect from the service and then try reconnecting to the datastore service after some delay...
uint32_t priority
Priority of the item (NBO), zero for remove.
Definition: datastore.h:219
Handle to the datastore service.
struct GNUNET_DATASTORE_QueueEntry * queue_head
Current head of priority queue.
static void handle_data_end(void *cls, const struct GNUNET_MessageHeader *msg)
Type of a function to call when we receive a GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the ...
#define GNUNET_log(kind,...)
Entry in list of pending tasks.
Definition: scheduler.c:134
Message from datastore service informing client about the success or failure of a requested operation...
Definition: datastore.h:64
struct GNUNET_TIME_Relative retry_time
How quickly should we retry? Used for exponential back-off on connect-errors.
uint16_t response_type
Expected response type.
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_remaining(struct GNUNET_TIME_Absolute future)
Given a timestamp in the future, how much time remains until then?
Definition: time.c:331
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
void * cont_cls
Closure for cont.
Header for all communications.
Time for absolute times used by GNUnet, in microseconds.
void GNUNET_MQ_destroy(struct GNUNET_MQ_Handle *mq)
Destroy the message queue.
Definition: mq.c:824
#define GNUNET_YES
Definition: gnunet_common.h:77
static unsigned int anonymity
int32_t status
Status code, -1 for errors.
Definition: datastore.h:74
void * cont_cls
Closure for cont.
Definition: datastore_api.c:64
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:353
struct GNUNET_DATASTORE_QueueEntry * next
This is a linked list.
static void mq_error_handler(void *cls, enum GNUNET_MQ_Error error)
Handle error in sending drop request to datastore.
#define GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE
Message sent by datastore client to remove data.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_key(struct GNUNET_DATASTORE_Handle *h, uint64_t next_uid, bool random, const struct GNUNET_HashCode *key, enum GNUNET_BLOCK_Type type, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a result for a particular key from the datastore.
uint64_t next_uid
UID at which to start the search.
Definition: datastore.h:122
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET
Message sent by datastore client to get data.
uint32_t data
The data value.
struct GNUNET_DATASTORE_Handle * GNUNET_DATASTORE_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the datastore service.
struct GNUNET_TIME_AbsoluteNBO GNUNET_TIME_absolute_hton(struct GNUNET_TIME_Absolute a)
Convert absolute time to network byte order.
Definition: time.c:657
unsigned int priority
Priority in the queue.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_for_replication(struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a random value from the datastore for content replication.
struct GNUNET_MessageHeader header
Type is GNUNET_MESSAGE_TYPE_DATASTORE_STATUS.
Definition: datastore.h:69
static int check_status(void *cls, const struct StatusMessage *sm)
Function called to check status message from the service.
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
#define DELAY_WARN_TIMEOUT
Definition: datastore_api.c:36
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_zero_anonymity(struct GNUNET_DATASTORE_Handle *h, uint64_t next_uid, unsigned int queue_priority, unsigned int max_queue_size, enum GNUNET_BLOCK_Type type, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a single zero-anonymity value from the datastore.
#define GNUNET_MESSAGE_TYPE_DATASTORE_DATA
Message sent by datastore to client providing requested data (in response to GET or GET_RANDOM reques...
Context for processing status messages.
Definition: datastore_api.c:54
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:48
#define GNUNET_free(ptr)
Wrapper around free.
Time for relative time used by GNUnet, in microseconds.
#define gettext_noop(String)
Definition: gettext.h:69
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:966
uint64_t amount
Number of bytes to reserve.
Definition: datastore.h:54