GNUnet  0.11.x
datastore_api.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2004-2013, 2016 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
32 #include "datastore.h"
33 
34 #define LOG(kind, ...) GNUNET_log_from (kind, "datastore-api", __VA_ARGS__)
35 
36 #define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37 
41 #define INSANE_STATISTICS GNUNET_NO
42 
49 #define MAX_EXCESS_RESULTS 8
50 
55 {
60 
64  void *cont_cls;
65 };
66 
67 
72 {
77 
81  void *proc_cls;
82 };
83 
84 
89 {
90  struct StatusContext sc;
91 
92  struct ResultContext rc;
93 };
94 
95 
100 {
105 
110 
115 
120 
124  void *cont_cls;
125 
129  union QueueContext qc;
130 
136 
142 
146  unsigned int priority;
147 
152  unsigned int max_queue;
153 
157  uint16_t response_type;
158 };
159 
160 
165 {
170 
175 
180 
185 
190 
195 
201 
205  unsigned int queue_size;
206 
212  unsigned int result_count;
213 
217  unsigned int skip_next_messages;
218 };
219 
220 
226 static void
227 try_reconnect (void *cls);
228 
229 
236 static void
238 {
239  if (NULL == h->mq)
240  {
241  GNUNET_break (0);
242  return;
243  }
245  h->mq = NULL;
246  h->skip_next_messages = 0;
248  = GNUNET_SCHEDULER_add_delayed (h->retry_time,
249  &try_reconnect,
250  h);
251 }
252 
253 
261 static void
263 {
264  struct GNUNET_DATASTORE_Handle *h = qe->h;
265 
266  GNUNET_CONTAINER_DLL_remove (h->queue_head,
267  h->queue_tail,
268  qe);
269  h->queue_size--;
270  if (NULL != qe->env)
272  if (NULL != qe->delay_warn_task)
274  GNUNET_free (qe);
275 }
276 
277 
283 static void
284 delay_warning (void *cls)
285 {
286  struct GNUNET_DATASTORE_QueueEntry *qe = cls;
287 
288  qe->delay_warn_task = NULL;
290  "Request %p of type %u at head of datastore queue for more than %s\n",
291  qe,
292  (unsigned int) qe->response_type,
294  GNUNET_YES));
296  &delay_warning,
297  qe);
298 }
299 
300 
307 static void
308 mq_error_handler (void *cls,
309  enum GNUNET_MQ_Error error)
310 {
311  struct GNUNET_DATASTORE_Handle *h = cls;
313 
315  "MQ error, reconnecting to DATASTORE\n");
316  do_disconnect (h);
317  qe = h->queue_head;
318  if (NULL == qe)
319  return;
320  if (NULL != qe->delay_warn_task)
321  {
323  qe->delay_warn_task = NULL;
324  }
325  if (NULL == qe->env)
326  {
327  union QueueContext qc = qe->qc;
328  uint16_t rt = qe->response_type;
329 
331  "Failed to receive response from database.\n");
333  switch (rt)
334  {
336  if (NULL != qc.sc.cont)
337  qc.sc.cont (qc.sc.cont_cls,
340  _ ("DATASTORE disconnected"));
341  break;
342 
344  if (NULL != qc.rc.proc)
345  qc.rc.proc (qc.rc.proc_cls,
346  NULL,
347  0,
348  NULL,
349  0,
350  0,
351  0,
352  0,
354  0);
355  break;
356 
357  default:
358  GNUNET_break (0);
359  }
360  }
361 }
362 
363 
372 {
373  struct GNUNET_DATASTORE_Handle *h;
374 
376  "Establishing DATASTORE connection!\n");
378  h->cfg = cfg;
379  try_reconnect (h);
380  if (NULL == h->mq)
381  {
382  GNUNET_free (h);
383  return NULL;
384  }
385  h->stats = GNUNET_STATISTICS_create ("datastore-api",
386  cfg);
387  return h;
388 }
389 
390 
397 static void
399 {
400  struct GNUNET_DATASTORE_Handle *h = cls;
401 
403  "Drop sent, disconnecting\n");
405  GNUNET_NO);
406 }
407 
408 
415 static void
417  enum GNUNET_MQ_Error error)
418 {
419  struct GNUNET_DATASTORE_Handle *h = cls;
420 
422  "Failed to ask datastore to drop tables\n");
424  GNUNET_NO);
425 }
426 
427 
435 void
437  int drop)
438 {
440 
442  "Datastore disconnect\n");
443  if (NULL != h->mq)
444  {
446  h->mq = NULL;
447  }
448  if (NULL != h->reconnect_task)
449  {
451  h->reconnect_task = NULL;
452  }
453  while (NULL != (qe = h->queue_head))
454  {
455  switch (qe->response_type)
456  {
458  if (NULL != qe->qc.sc.cont)
459  qe->qc.sc.cont (qe->qc.sc.cont_cls,
462  _ ("Disconnected from DATASTORE"));
463  break;
464 
466  if (NULL != qe->qc.rc.proc)
467  qe->qc.rc.proc (qe->qc.rc.proc_cls,
468  NULL,
469  0,
470  NULL,
471  0,
472  0,
473  0,
474  0,
476  0);
477  break;
478 
479  default:
480  GNUNET_break (0);
481  }
483  }
484  if (GNUNET_YES == drop)
485  {
487  "Re-connecting to issue DROP!\n");
488  GNUNET_assert (NULL == h->mq);
490  "datastore",
491  NULL,
493  h);
494  if (NULL != h->mq)
495  {
496  struct GNUNET_MessageHeader *hdr;
497  struct GNUNET_MQ_Envelope *env;
498 
499  env = GNUNET_MQ_msg (hdr,
503  h);
504  GNUNET_MQ_send (h->mq,
505  env);
506  return;
507  }
508  GNUNET_break (0);
509  }
511  GNUNET_NO);
512  h->stats = NULL;
513  GNUNET_free (h);
514 }
515 
516 
532 static struct GNUNET_DATASTORE_QueueEntry *
534  struct GNUNET_MQ_Envelope *env,
535  unsigned int queue_priority,
536  unsigned int max_queue_size,
537  uint16_t expected_type,
538  const union QueueContext *qc)
539 {
541  struct GNUNET_DATASTORE_QueueEntry *pos;
542  unsigned int c;
543 
544  if ((NULL != h->queue_tail) &&
545  (h->queue_tail->priority >= queue_priority))
546  {
547  c = h->queue_size;
548  pos = NULL;
549  }
550  else
551  {
552  c = 0;
553  pos = h->queue_head;
554  }
555  while ((NULL != pos) &&
556  (c < max_queue_size) &&
557  (pos->priority >= queue_priority))
558  {
559  c++;
560  pos = pos->next;
561  }
562  if (c >= max_queue_size)
563  {
564  GNUNET_STATISTICS_update (h->stats,
565  gettext_noop ("# queue overflows"),
566  1,
567  GNUNET_NO);
569  return NULL;
570  }
572  qe->h = h;
573  qe->env = env;
574  qe->response_type = expected_type;
575  qe->qc = *qc;
576  qe->priority = queue_priority;
577  qe->max_queue = max_queue_size;
578  if (NULL == pos)
579  {
580  /* append at the tail */
581  pos = h->queue_tail;
582  }
583  else
584  {
585  pos = pos->prev;
586  /* do not insert at HEAD if HEAD query was already
587  * transmitted and we are still receiving replies! */
588  if ((NULL == pos) &&
589  (NULL == h->queue_head->env))
590  pos = h->queue_head;
591  }
592  c++;
593 #if INSANE_STATISTICS
594  GNUNET_STATISTICS_update (h->stats,
595  gettext_noop ("# queue entries created"),
596  1,
597  GNUNET_NO);
598 #endif
600  h->queue_tail,
601  pos,
602  qe);
603  h->queue_size++;
604  return qe;
605 }
606 
607 
614 static void
616 {
618 
619  if (NULL == (qe = h->queue_head))
620  {
621  /* no entry in queue */
623  "Queue empty\n");
624  return;
625  }
626  if (NULL == qe->env)
627  {
628  /* waiting for replies */
630  "Head request already transmitted\n");
631  return;
632  }
633  if (NULL == h->mq)
634  {
635  /* waiting for reconnect */
637  "Not connected\n");
638  return;
639  }
640  GNUNET_assert (NULL == qe->delay_warn_task);
642  &delay_warning,
643  qe);
644  GNUNET_MQ_send (h->mq,
645  qe->env);
646  qe->env = NULL;
647 }
648 
649 
657 static struct GNUNET_DATASTORE_QueueEntry *
659  uint16_t response_type)
660 {
662 
663  if (h->skip_next_messages > 0)
664  {
665  h->skip_next_messages--;
666  process_queue (h);
667  return NULL;
668  }
669  qe = h->queue_head;
670  if (NULL == qe)
671  {
672  GNUNET_break (0);
673  do_disconnect (h);
674  return NULL;
675  }
676  if (NULL != qe->env)
677  {
678  GNUNET_break (0);
679  do_disconnect (h);
680  return NULL;
681  }
683  {
684  GNUNET_break (0);
685  do_disconnect (h);
686  return NULL;
687  }
688  return qe;
689 }
690 
691 
699 static int
700 check_status (void *cls,
701  const struct StatusMessage *sm)
702 {
703  uint16_t msize = ntohs (sm->header.size) - sizeof(*sm);
704  int32_t status = ntohl (sm->status);
705 
706  if (msize > 0)
707  {
708  const char *emsg = (const char *) &sm[1];
709 
710  if ('\0' != emsg[msize - 1])
711  {
712  GNUNET_break (0);
713  return GNUNET_SYSERR;
714  }
715  }
716  else if (GNUNET_SYSERR == status)
717  {
718  GNUNET_break (0);
719  return GNUNET_SYSERR;
720  }
721  return GNUNET_OK;
722 }
723 
724 
731 static void
732 handle_status (void *cls,
733  const struct StatusMessage *sm)
734 {
735  struct GNUNET_DATASTORE_Handle *h = cls;
737  struct StatusContext rc;
738  const char *emsg;
739  int32_t status = ntohl (sm->status);
740 
741  qe = get_queue_head (h,
743  if (NULL == qe)
744  return;
745  rc = qe->qc.sc;
747  if (ntohs (sm->header.size) > sizeof(struct StatusMessage))
748  emsg = (const char *) &sm[1];
749  else
750  emsg = NULL;
752  "Received status %d/%s\n",
753  (int) status,
754  emsg);
755  GNUNET_STATISTICS_update (h->stats,
756  gettext_noop ("# status messages received"),
757  1,
758  GNUNET_NO);
759  h->retry_time = GNUNET_TIME_UNIT_ZERO;
760  process_queue (h);
761  if (NULL != rc.cont)
762  rc.cont (rc.cont_cls,
763  status,
765  emsg);
766 }
767 
768 
775 static int
776 check_data (void *cls,
777  const struct DataMessage *dm)
778 {
779  uint16_t msize = ntohs (dm->header.size) - sizeof(*dm);
780 
781  if (msize != ntohl (dm->size))
782  {
783  GNUNET_break (0);
784  return GNUNET_SYSERR;
785  }
786  return GNUNET_OK;
787 }
788 
789 
796 static void
797 handle_data (void *cls,
798  const struct DataMessage *dm)
799 {
800  struct GNUNET_DATASTORE_Handle *h = cls;
802  struct ResultContext rc;
803 
804  qe = get_queue_head (h,
806  if (NULL == qe)
807  return;
808 #if INSANE_STATISTICS
809  GNUNET_STATISTICS_update (h->stats,
810  gettext_noop ("# Results received"),
811  1,
812  GNUNET_NO);
813 #endif
815  "Received result %llu with type %u and size %u with key %s\n",
816  (unsigned long long) GNUNET_ntohll (dm->uid),
817  ntohl (dm->type),
818  ntohl (dm->size),
819  GNUNET_h2s (&dm->key));
820  rc = qe->qc.rc;
822  h->retry_time = GNUNET_TIME_UNIT_ZERO;
823  process_queue (h);
824  if (NULL != rc.proc)
825  rc.proc (rc.proc_cls,
826  &dm->key,
827  ntohl (dm->size),
828  &dm[1],
829  ntohl (dm->type),
830  ntohl (dm->priority),
831  ntohl (dm->anonymity),
832  ntohl (dm->replication),
834  GNUNET_ntohll (dm->uid));
835 }
836 
837 
845 static void
846 handle_data_end (void *cls,
847  const struct GNUNET_MessageHeader *msg)
848 {
849  struct GNUNET_DATASTORE_Handle *h = cls;
851  struct ResultContext rc;
852 
853  qe = get_queue_head (h,
855  if (NULL == qe)
856  return;
857  rc = qe->qc.rc;
860  "Received end of result set, new queue size is %u\n",
861  h->queue_size);
862  h->retry_time = GNUNET_TIME_UNIT_ZERO;
863  h->result_count = 0;
864  process_queue (h);
865  /* signal end of iteration */
866  if (NULL != rc.proc)
867  rc.proc (rc.proc_cls,
868  NULL,
869  0,
870  NULL,
871  0,
872  0,
873  0,
874  0,
876  0);
877 }
878 
879 
885 static void
886 try_reconnect (void *cls)
887 {
888  struct GNUNET_DATASTORE_Handle *h = cls;
892  struct StatusMessage,
893  h),
896  struct DataMessage,
897  h),
898  GNUNET_MQ_hd_fixed_size (data_end,
900  struct GNUNET_MessageHeader,
901  h),
903  };
904 
905  h->retry_time = GNUNET_TIME_STD_BACKOFF (h->retry_time);
906  h->reconnect_task = NULL;
907  GNUNET_assert (NULL == h->mq);
909  "datastore",
910  handlers,
912  h);
913  if (NULL == h->mq)
914  return;
915  GNUNET_STATISTICS_update (h->stats,
916  gettext_noop (
917  "# datastore connections (re)created"),
918  1,
919  GNUNET_NO);
921  "Reconnected to DATASTORE\n");
922  process_queue (h);
923 }
924 
925 
934 static void
936  int32_t result,
938  const char *emsg)
939 {
940  /* do nothing */
941 }
942 
943 
971  uint32_t rid,
972  const struct GNUNET_HashCode *key,
973  size_t size,
974  const void *data,
975  enum GNUNET_BLOCK_Type type,
976  uint32_t priority,
977  uint32_t anonymity,
978  uint32_t replication,
980  unsigned int queue_priority,
981  unsigned int max_queue_size,
983  void *cont_cls)
984 {
986  struct GNUNET_MQ_Envelope *env;
987  struct DataMessage *dm;
988  union QueueContext qc;
989 
990  if (size + sizeof(*dm) >= GNUNET_MAX_MESSAGE_SIZE)
991  {
992  GNUNET_break (0);
993  return NULL;
994  }
995 
997  "Asked to put %lu bytes of data under key `%s' for %s\n",
998  (unsigned long) size,
999  GNUNET_h2s (key),
1002  GNUNET_YES));
1003  env = GNUNET_MQ_msg_extra (dm,
1004  size,
1006  dm->rid = htonl (rid);
1007  dm->size = htonl ((uint32_t) size);
1008  dm->type = htonl (type);
1009  dm->priority = htonl (priority);
1010  dm->anonymity = htonl (anonymity);
1011  dm->replication = htonl (replication);
1013  dm->key = *key;
1014  GNUNET_memcpy (&dm[1],
1015  data,
1016  size);
1017  qc.sc.cont = cont;
1018  qc.sc.cont_cls = cont_cls;
1019  qe = make_queue_entry (h,
1020  env,
1021  queue_priority,
1022  max_queue_size,
1024  &qc);
1025  if (NULL == qe)
1026  {
1028  "Could not create queue entry for PUT\n");
1029  return NULL;
1030  }
1031  GNUNET_STATISTICS_update (h->stats,
1032  gettext_noop ("# PUT requests executed"),
1033  1,
1034  GNUNET_NO);
1035  process_queue (h);
1036  return qe;
1037 }
1038 
1039 
1057  uint64_t amount,
1058  uint32_t entries,
1060  void *cont_cls)
1061 {
1063  struct GNUNET_MQ_Envelope *env;
1064  struct ReserveMessage *rm;
1065  union QueueContext qc;
1066 
1067  if (NULL == cont)
1068  cont = &drop_status_cont;
1070  "Asked to reserve %llu bytes of data and %u entries\n",
1071  (unsigned long long) amount,
1072  (unsigned int) entries);
1073  env = GNUNET_MQ_msg (rm,
1075  rm->entries = htonl (entries);
1076  rm->amount = GNUNET_htonll (amount);
1077 
1078  qc.sc.cont = cont;
1079  qc.sc.cont_cls = cont_cls;
1080  qe = make_queue_entry (h,
1081  env,
1082  UINT_MAX,
1083  UINT_MAX,
1085  &qc);
1086  if (NULL == qe)
1087  {
1089  "Could not create queue entry to reserve\n");
1090  return NULL;
1091  }
1092  GNUNET_STATISTICS_update (h->stats,
1093  gettext_noop ("# RESERVE requests executed"),
1094  1,
1095  GNUNET_NO);
1096  process_queue (h);
1097  return qe;
1098 }
1099 
1100 
1123  uint32_t rid,
1124  unsigned int queue_priority,
1125  unsigned int max_queue_size,
1127  void *cont_cls)
1128 {
1130  struct GNUNET_MQ_Envelope *env;
1131  struct ReleaseReserveMessage *rrm;
1132  union QueueContext qc;
1133 
1134  if (NULL == cont)
1135  cont = &drop_status_cont;
1137  "Asked to release reserve %d\n",
1138  rid);
1139  env = GNUNET_MQ_msg (rrm,
1141  rrm->rid = htonl (rid);
1142  qc.sc.cont = cont;
1143  qc.sc.cont_cls = cont_cls;
1144  qe = make_queue_entry (h,
1145  env,
1146  queue_priority,
1147  max_queue_size,
1149  &qc);
1150  if (NULL == qe)
1151  {
1153  "Could not create queue entry to release reserve\n");
1154  return NULL;
1155  }
1156  GNUNET_STATISTICS_update (h->stats,
1157  gettext_noop
1158  ("# RELEASE RESERVE requests executed"), 1,
1159  GNUNET_NO);
1160  process_queue (h);
1161  return qe;
1162 }
1163 
1164 
1187  const struct GNUNET_HashCode *key,
1188  size_t size,
1189  const void *data,
1190  unsigned int queue_priority,
1191  unsigned int max_queue_size,
1193  void *cont_cls)
1194 {
1196  struct DataMessage *dm;
1197  struct GNUNET_MQ_Envelope *env;
1198  union QueueContext qc;
1199 
1200  if (sizeof(*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1201  {
1202  GNUNET_break (0);
1203  return NULL;
1204  }
1205  if (NULL == cont)
1206  cont = &drop_status_cont;
1208  "Asked to remove %lu bytes under key `%s'\n",
1209  (unsigned long) size,
1210  GNUNET_h2s (key));
1211  env = GNUNET_MQ_msg_extra (dm,
1212  size,
1214  dm->size = htonl (size);
1215  dm->key = *key;
1216  GNUNET_memcpy (&dm[1],
1217  data,
1218  size);
1219 
1220  qc.sc.cont = cont;
1221  qc.sc.cont_cls = cont_cls;
1222 
1223  qe = make_queue_entry (h,
1224  env,
1225  queue_priority,
1226  max_queue_size,
1228  &qc);
1229  if (NULL == qe)
1230  {
1232  "Could not create queue entry for REMOVE\n");
1233  return NULL;
1234  }
1235  GNUNET_STATISTICS_update (h->stats,
1236  gettext_noop ("# REMOVE requests executed"),
1237  1,
1238  GNUNET_NO);
1239  process_queue (h);
1240  return qe;
1241 }
1242 
1243 
1264  unsigned int queue_priority,
1265  unsigned int max_queue_size,
1267  void *proc_cls)
1268 {
1270  struct GNUNET_MQ_Envelope *env;
1271  struct GNUNET_MessageHeader *m;
1272  union QueueContext qc;
1273 
1274  GNUNET_assert (NULL != proc);
1276  "Asked to get replication entry\n");
1277  env = GNUNET_MQ_msg (m,
1279  qc.rc.proc = proc;
1280  qc.rc.proc_cls = proc_cls;
1281  qe = make_queue_entry (h,
1282  env,
1283  queue_priority,
1284  max_queue_size,
1286  &qc);
1287  if (NULL == qe)
1288  {
1290  "Could not create queue entry for GET REPLICATION\n");
1291  return NULL;
1292  }
1293  GNUNET_STATISTICS_update (h->stats,
1294  gettext_noop
1295  ("# GET REPLICATION requests executed"), 1,
1296  GNUNET_NO);
1297  process_queue (h);
1298  return qe;
1299 }
1300 
1301 
1320  uint64_t next_uid,
1321  unsigned int queue_priority,
1322  unsigned int max_queue_size,
1323  enum GNUNET_BLOCK_Type type,
1325  void *proc_cls)
1326 {
1328  struct GNUNET_MQ_Envelope *env;
1329  struct GetZeroAnonymityMessage *m;
1330  union QueueContext qc;
1331 
1332  GNUNET_assert (NULL != proc);
1335  "Asked to get a zero-anonymity entry of type %d\n",
1336  type);
1337  env = GNUNET_MQ_msg (m,
1339  m->type = htonl ((uint32_t) type);
1340  m->next_uid = GNUNET_htonll (next_uid);
1341  qc.rc.proc = proc;
1342  qc.rc.proc_cls = proc_cls;
1343  qe = make_queue_entry (h,
1344  env,
1345  queue_priority,
1346  max_queue_size,
1348  &qc);
1349  if (NULL == qe)
1350  {
1352  "Could not create queue entry for zero-anonymity procation\n");
1353  return NULL;
1354  }
1355  GNUNET_STATISTICS_update (h->stats,
1356  gettext_noop
1357  ("# GET ZERO ANONYMITY requests executed"), 1,
1358  GNUNET_NO);
1359  process_queue (h);
1360  return qe;
1361 }
1362 
1363 
1384  uint64_t next_uid,
1385  bool random,
1386  const struct GNUNET_HashCode *key,
1387  enum GNUNET_BLOCK_Type type,
1388  unsigned int queue_priority,
1389  unsigned int max_queue_size,
1391  void *proc_cls)
1392 {
1394  struct GNUNET_MQ_Envelope *env;
1395  struct GetKeyMessage *gkm;
1396  struct GetMessage *gm;
1397  union QueueContext qc;
1398 
1399  GNUNET_assert (NULL != proc);
1401  "Asked to look for data of type %u under key `%s'\n",
1402  (unsigned int) type,
1403  (NULL != key) ? GNUNET_h2s (key) : "NULL");
1404  if (NULL == key)
1405  {
1406  env = GNUNET_MQ_msg (gm,
1408  gm->type = htonl (type);
1409  gm->next_uid = GNUNET_htonll (next_uid);
1410  gm->random = random;
1411  }
1412  else
1413  {
1414  env = GNUNET_MQ_msg (gkm,
1416  gkm->type = htonl (type);
1417  gkm->next_uid = GNUNET_htonll (next_uid);
1418  gkm->random = random;
1419  gkm->key = *key;
1420  }
1421  qc.rc.proc = proc;
1422  qc.rc.proc_cls = proc_cls;
1423  qe = make_queue_entry (h,
1424  env,
1425  queue_priority,
1426  max_queue_size,
1428  &qc);
1429  if (NULL == qe)
1430  {
1432  "Could not queue request for `%s'\n",
1433  (NULL != key) ? GNUNET_h2s (key): "NULL");
1434  return NULL;
1435  }
1436 #if INSANE_STATISTICS
1437  GNUNET_STATISTICS_update (h->stats,
1438  gettext_noop ("# GET requests executed"),
1439  1,
1440  GNUNET_NO);
1441 #endif
1442  process_queue (h);
1443  return qe;
1444 }
1445 
1446 
1453 void
1455 {
1456  struct GNUNET_DATASTORE_Handle *h = qe->h;
1457 
1459  "Pending DATASTORE request %p cancelled (%d, %d)\n",
1460  qe,
1461  NULL == qe->env,
1462  h->queue_head == qe);
1463  if (NULL == qe->env)
1464  {
1465  free_queue_entry (qe);
1466  h->skip_next_messages++;
1467  return;
1468  }
1469  free_queue_entry (qe);
1470  process_queue (h);
1471 }
1472 
1473 
1474 /* end of datastore_api.c */
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
structs for communication between datastore service and API
static int check_status(void *cls, const struct StatusMessage *sm)
Function called to check status message from the service.
static void process_queue(struct GNUNET_DATASTORE_Handle *h)
Process entries in the queue (or do nothing if we are already doing so).
static void disconnect_after_drop(void *cls)
Task used by to disconnect from the datastore after we send the GNUNET_MESSAGE_TYPE_DATASTORE_DROP me...
static struct GNUNET_DATASTORE_QueueEntry * make_queue_entry(struct GNUNET_DATASTORE_Handle *h, struct GNUNET_MQ_Envelope *env, unsigned int queue_priority, unsigned int max_queue_size, uint16_t expected_type, const union QueueContext *qc)
Create a new entry for our priority queue (and possibly discard other entries if the queue is getting...
static void disconnect_on_mq_error(void *cls, enum GNUNET_MQ_Error error)
Handle error in sending drop request to datastore.
static void try_reconnect(void *cls)
Try reconnecting to the datastore service.
static struct GNUNET_DATASTORE_QueueEntry * get_queue_head(struct GNUNET_DATASTORE_Handle *h, uint16_t response_type)
Get the entry at the head of the message queue.
#define DELAY_WARN_TIMEOUT
Definition: datastore_api.c:36
static void delay_warning(void *cls)
Task that logs an error after some time.
static void mq_error_handler(void *cls, enum GNUNET_MQ_Error error)
Handle error in sending drop request to datastore.
static void do_disconnect(struct GNUNET_DATASTORE_Handle *h)
Disconnect from the service and then try reconnecting to the datastore service after some delay.
static void drop_status_cont(void *cls, int32_t result, struct GNUNET_TIME_Absolute min_expiration, const char *emsg)
Dummy continuation used to do nothing (but be non-zero).
static void free_queue_entry(struct GNUNET_DATASTORE_QueueEntry *qe)
Free a queue entry.
static int check_data(void *cls, const struct DataMessage *dm)
Check data message we received from the service.
static void handle_status(void *cls, const struct StatusMessage *sm)
Function called to handle status message from the service.
static void handle_data_end(void *cls, const struct GNUNET_MessageHeader *msg)
Type of a function to call when we receive a GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the ...
#define LOG(kind,...)
Definition: datastore_api.c:34
static void handle_data(void *cls, const struct DataMessage *dm)
Handle data message we got from the service.
#define gettext_noop(String)
Definition: gettext.h:69
static char * expiration
Credential TTL.
Definition: gnunet-abd.c:96
static const struct GNUNET_CONFIGURATION_Handle * cfg
Configuration we are using.
Definition: gnunet-abd.c:36
static struct GNUNET_ARM_MonitorHandle * m
Monitor connection with ARM.
Definition: gnunet-arm.c:104
static struct GNUNET_ARM_Handle * h
Connection with ARM.
Definition: gnunet-arm.c:99
static struct GNUNET_CADET_MessageHandler handlers[]
Handlers, for diverse services.
static unsigned int replication
static struct GNUNET_DATASTORE_QueueEntry * qe
Current operation.
struct GNUNET_HashCode key
The key used in the DHT.
static unsigned int anonymity
uint32_t data
The data value.
uint16_t status
See PRISM_STATUS_*-constants.
static int result
Global testing status.
static struct GNUNET_TIME_Absolute min_expiration
Minimum time that content should have to not be discarded instantly (time stamp of any content that w...
#define GNUNET_log(kind,...)
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:53
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:36
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
@ GNUNET_OK
Definition: gnunet_common.h:95
@ GNUNET_YES
Definition: gnunet_common.h:97
@ GNUNET_NO
Definition: gnunet_common.h:94
@ GNUNET_SYSERR
Definition: gnunet_common.h:93
#define GNUNET_MAX_MESSAGE_SIZE
Largest supported message (to be precise, one byte more than the largest possible message,...
GNUNET_BLOCK_Type
Blocks in the datastore and the datacache must have a unique type.
@ GNUNET_BLOCK_TYPE_ANY
Any type of block, used as a wildcard when searching.
struct GNUNET_MQ_Handle * GNUNET_CLIENT_connect(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *service_name, const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_ErrorHandler error_handler, void *error_handler_cls)
Create a message queue to connect to a GNUnet service.
Definition: client.c:1064
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_zero_anonymity(struct GNUNET_DATASTORE_Handle *h, uint64_t next_uid, unsigned int queue_priority, unsigned int max_queue_size, enum GNUNET_BLOCK_Type type, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a single zero-anonymity value from the datastore.
struct GNUNET_DATASTORE_Handle * GNUNET_DATASTORE_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the datastore service.
void GNUNET_DATASTORE_disconnect(struct GNUNET_DATASTORE_Handle *h, int drop)
Disconnect from the datastore service (and free associated resources).
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_key(struct GNUNET_DATASTORE_Handle *h, uint64_t next_uid, bool random, const struct GNUNET_HashCode *key, enum GNUNET_BLOCK_Type type, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a result for a particular key from the datastore.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_remove(struct GNUNET_DATASTORE_Handle *h, const struct GNUNET_HashCode *key, size_t size, const void *data, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Explicitly remove some content from the database.
void GNUNET_DATASTORE_cancel(struct GNUNET_DATASTORE_QueueEntry *qe)
Cancel a datastore operation.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_for_replication(struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a random value from the datastore for content replication.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_reserve(struct GNUNET_DATASTORE_Handle *h, uint64_t amount, uint32_t entries, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Reserve space in the datastore.
void(* GNUNET_DATASTORE_DatumProcessor)(void *cls, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid)
Process a datum that was stored in the datastore.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_put(struct GNUNET_DATASTORE_Handle *h, uint32_t rid, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Store an item in the datastore.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve(struct GNUNET_DATASTORE_Handle *h, uint32_t rid, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Signal that all of the data for which a reservation was made has been stored and that whatever excess...
void(* GNUNET_DATASTORE_ContinuationWithStatus)(void *cls, int32_t success, struct GNUNET_TIME_Absolute min_expiration, const char *msg)
Continuation called to notify client about result of the operation.
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_CONTAINER_DLL_insert_after(head, tail, other, element)
Insert an element into a DLL after the given other element.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur.
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_DEBUG
#define GNUNET_new(type)
Allocate a struct or union of the given type.
#define GNUNET_free(ptr)
Wrapper around free.
GNUNET_MQ_Error
Error codes for the queue.
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:355
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *mqm)
Discard the message queue message, free all allocated resources.
Definition: mq.c:323
void * cls
Closure for mv and cb.
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct.
Definition: gnunet_mq_lib.h:52
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:67
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
void GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Call a callback once the envelope has been sent, that is, sending it can not be canceled anymore.
Definition: mq.c:787
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
void GNUNET_MQ_destroy(struct GNUNET_MQ_Handle *mq)
Destroy the message queue.
Definition: mq.c:837
#define GNUNET_MESSAGE_TYPE_DATASTORE_DROP
Message sent by datastore client to drop the database.
#define GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE
Message sent by datastore client on join.
#define GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END
Message sent by datastore to client signaling end of matching data.
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY
Message sent by datastore client to get data by key.
#define GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE
Message sent by datastore client on join.
#define GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE
Message sent by datastore client to remove data.
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET
Message sent by datastore client to get data.
#define GNUNET_MESSAGE_TYPE_DATASTORE_DATA
Message sent by datastore to client providing requested data (in response to GET or GET_RANDOM reques...
#define GNUNET_MESSAGE_TYPE_DATASTORE_PUT
Message sent by datastore client to store data.
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION
Message sent by datastore client to get random data.
#define GNUNET_MESSAGE_TYPE_DATASTORE_STATUS
Message sent by datastore to client informing about status processing a request (in response to RESER...
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY
Message sent by datastore client to get random data.
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:972
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1269
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_remaining(struct GNUNET_TIME_Absolute future)
Given a timestamp in the future, how much time remains until then?
Definition: time.c:232
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_ntoh(struct GNUNET_TIME_AbsoluteNBO a)
Convert absolute time from network byte order.
Definition: time.c:542
#define GNUNET_TIME_UNIT_ZERO
Relative time zero.
#define GNUNET_TIME_UNIT_ZERO_ABS
Absolute time zero.
const char * GNUNET_STRINGS_relative_time_to_string(struct GNUNET_TIME_Relative delta, int do_round)
Give relative time in human-readable fancy format.
Definition: strings.c:557
struct GNUNET_TIME_AbsoluteNBO GNUNET_TIME_absolute_hton(struct GNUNET_TIME_Absolute a)
Convert absolute time to network byte order.
Definition: time.c:464
#define GNUNET_TIME_STD_BACKOFF(r)
Perform our standard exponential back-off calculation, starting at 1 ms and then going by a factor of...
static unsigned int size
Size of the "table".
Definition: peer.c:67
#define _(String)
GNU gettext support macro.
Definition: platform.h:177
Message transmitting content from or to the datastore service.
Definition: datastore.h:192
uint32_t priority
Priority of the item (NBO), zero for remove.
Definition: datastore.h:219
struct GNUNET_HashCode key
Key under which the item can be found.
Definition: datastore.h:252
uint64_t uid
Unique ID for the content (can be used for UPDATE); can be zero for remove (which indicates that the ...
Definition: datastore.h:242
struct GNUNET_TIME_AbsoluteNBO expiration
Expiration time (NBO); zero for remove.
Definition: datastore.h:247
struct GNUNET_MessageHeader header
Type is either GNUNET_MESSAGE_TYPE_DATASTORE_PUT, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE or GNUNET_MESS...
Definition: datastore.h:199
uint32_t type
Type of the item (NBO), zero for remove, (actually an enum GNUNET_BLOCK_Type)
Definition: datastore.h:214
uint32_t size
Number of bytes in the item (NBO).
Definition: datastore.h:209
uint32_t replication
Desired replication level.
Definition: datastore.h:229
uint32_t anonymity
Desired anonymity level (NBO), zero for remove.
Definition: datastore.h:224
uint32_t rid
Reservation ID to use; use zero for none.
Definition: datastore.h:204
struct GNUNET_MQ_Handle * mq
Our connection to the ARM service.
Definition: arm_api.c:107
const struct GNUNET_CONFIGURATION_Handle * cfg
The configuration that we are using.
Definition: arm_api.c:112
struct GNUNET_SCHEDULER_Task * reconnect_task
ID of the reconnect task (if any).
Definition: arm_api.c:147
Handle to the datastore service.
unsigned int queue_size
Number of entries in the queue.
unsigned int skip_next_messages
We should ignore the next message(s) from the service.
struct GNUNET_TIME_Relative retry_time
How quickly should we retry? Used for exponential back-off on connect-errors.
struct GNUNET_DATASTORE_QueueEntry * queue_head
Current head of priority queue.
struct GNUNET_SCHEDULER_Task * reconnect_task
Task for trying to reconnect.
unsigned int result_count
Number of results we're receiving for the current query after application stopped to care.
struct GNUNET_STATISTICS_Handle * stats
Handle for statistics.
const struct GNUNET_CONFIGURATION_Handle * cfg
Our configuration.
struct GNUNET_MQ_Handle * mq
Current connection to the datastore service.
struct GNUNET_DATASTORE_QueueEntry * queue_tail
Current tail of priority queue.
Entry in our priority queue.
union QueueContext qc
Context for the operation.
struct GNUNET_DATASTORE_QueueEntry * prev
This is a linked list.
uint16_t response_type
Expected response type.
unsigned int priority
Priority in the queue.
struct GNUNET_DATASTORE_Handle * h
Handle to the master context.
unsigned int max_queue
Maximum allowed length of queue (otherwise this request should be discarded).
struct GNUNET_MQ_Envelope * env
Envelope of the request to transmit, NULL after transmission.
struct GNUNET_DATASTORE_QueueEntry * next
This is a linked list.
GNUNET_DATASTORE_ContinuationWithStatus cont
Function to call after transmission of the request.
void * cont_cls
Closure for cont.
struct GNUNET_SCHEDULER_Task * delay_warn_task
Task we run if this entry stalls the queue and we need to warn the user.
A 512-bit hashcode.
Handle to a message queue.
Definition: mq.c:86
Message handler for a specific message type.
Header for all communications.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
Entry in list of pending tasks.
Definition: scheduler.c:135
Handle for the service.
Time for absolute times used by GNUnet, in microseconds.
Time for relative time used by GNUnet, in microseconds.
Message to the datastore service asking about specific content.
Definition: datastore.h:108
uint32_t type
Desired content type.
Definition: datastore.h:117
uint64_t next_uid
UID at which to start the search.
Definition: datastore.h:122
struct GNUNET_HashCode key
Desired key.
Definition: datastore.h:132
uint32_t random
If true return a random result.
Definition: datastore.h:127
Message to the datastore service asking about specific content.
Definition: datastore.h:141
uint32_t random
If true return a random result.
Definition: datastore.h:160
uint32_t type
Desired content type.
Definition: datastore.h:150
uint64_t next_uid
UID at which to start the search.
Definition: datastore.h:155
Message to the datastore service asking about zero anonymity content.
Definition: datastore.h:169
Message from datastore client informing service that the remainder of the reserved bytes can now be r...
Definition: datastore.h:90
int32_t rid
Reservation id.
Definition: datastore.h:99
Message from datastore service informing client about the current size of the datastore.
Definition: datastore.h:40
uint32_t entries
Number of items to reserve.
Definition: datastore.h:49
uint64_t amount
Number of bytes to reserve.
Definition: datastore.h:54
Context for processing result messages.
Definition: datastore_api.c:72
GNUNET_DATASTORE_DatumProcessor proc
Function to call with the result.
Definition: datastore_api.c:76
void * proc_cls
Closure for proc.
Definition: datastore_api.c:81
Context for processing status messages.
Definition: datastore_api.c:55
void * cont_cls
Closure for cont.
Definition: datastore_api.c:64
GNUNET_DATASTORE_ContinuationWithStatus cont
Continuation to call with the status.
Definition: datastore_api.c:59
Message from datastore service informing client about the success or failure of a requested operation...
Definition: datastore.h:65
struct GNUNET_TIME_AbsoluteNBO min_expiration
Minimum expiration time required for content to be stored by the datacache at this time,...
Definition: datastore.h:80
int32_t status
Status code, -1 for errors.
Definition: datastore.h:74
struct GNUNET_MessageHeader header
Type is GNUNET_MESSAGE_TYPE_DATASTORE_STATUS.
Definition: datastore.h:69
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
struct ListEntry * entries
List of peers in the list.
Context for a queue operation.
Definition: datastore_api.c:89
struct ResultContext rc
Definition: datastore_api.c:92
struct StatusContext sc
Definition: datastore_api.c:90