GNUnet  0.10.x
datastore_api.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2004-2013, 2016 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19 */
20 
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
32 #include "datastore.h"
33 
34 #define LOG(kind,...) GNUNET_log_from (kind, "datastore-api",__VA_ARGS__)
35 
36 #define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37 
41 #define INSANE_STATISTICS GNUNET_NO
42 
49 #define MAX_EXCESS_RESULTS 8
50 
55 {
60 
64  void *cont_cls;
65 
66 };
67 
68 
73 {
78 
82  void *proc_cls;
83 
84 };
85 
86 
91 {
92 
93  struct StatusContext sc;
94 
95  struct ResultContext rc;
96 
97 };
98 
99 
104 {
105 
110 
115 
120 
125 
129  void *cont_cls;
130 
134  union QueueContext qc;
135 
141 
147 
151  unsigned int priority;
152 
157  unsigned int max_queue;
158 
162  uint16_t response_type;
163 
164 };
165 
166 
171 {
172 
177 
182 
187 
192 
197 
202 
207  struct GNUNET_TIME_Relative retry_time;
208 
212  unsigned int queue_size;
213 
219  unsigned int result_count;
220 
224  unsigned int skip_next_messages;
225 
226 };
227 
228 
234 static void
235 try_reconnect (void *cls);
236 
237 
244 static void
246 {
247  if (NULL == h->mq)
248  {
249  GNUNET_break (0);
250  return;
251  }
252  GNUNET_MQ_destroy (h->mq);
253  h->mq = NULL;
254  h->skip_next_messages = 0;
255  h->reconnect_task
257  &try_reconnect,
258  h);
259 }
260 
261 
269 static void
271 {
272  struct GNUNET_DATASTORE_Handle *h = qe->h;
273 
275  h->queue_tail,
276  qe);
277  h->queue_size--;
278  if (NULL != qe->env)
279  GNUNET_MQ_discard (qe->env);
280  if (NULL != qe->delay_warn_task)
282  GNUNET_free (qe);
283 }
284 
285 
291 static void
292 delay_warning (void *cls)
293 {
294  struct GNUNET_DATASTORE_QueueEntry *qe = cls;
295 
296  qe->delay_warn_task = NULL;
298  "Request %p of type %u at head of datastore queue for more than %s\n",
299  qe,
300  (unsigned int) qe->response_type,
302  GNUNET_YES));
304  &delay_warning,
305  qe);
306 }
307 
308 
315 static void
316 mq_error_handler (void *cls,
317  enum GNUNET_MQ_Error error)
318 {
319  struct GNUNET_DATASTORE_Handle *h = cls;
321 
323  "MQ error, reconnecting to DATASTORE\n");
324  do_disconnect (h);
325  qe = h->queue_head;
326  if (NULL == qe)
327  return;
328  if (NULL != qe->delay_warn_task)
329  {
331  qe->delay_warn_task = NULL;
332  }
333  if (NULL == qe->env)
334  {
335  union QueueContext qc = qe->qc;
336  uint16_t rt = qe->response_type;
337 
339  "Failed to receive response from database.\n");
340  free_queue_entry (qe);
341  switch (rt)
342  {
344  if (NULL != qc.sc.cont)
345  qc.sc.cont (qc.sc.cont_cls,
348  _("DATASTORE disconnected"));
349  break;
351  if (NULL != qc.rc.proc)
352  qc.rc.proc (qc.rc.proc_cls,
353  NULL,
354  0,
355  NULL,
356  0,
357  0,
358  0,
359  0,
361  0);
362  break;
363  default:
364  GNUNET_break (0);
365  }
366  }
367 }
368 
369 
378 {
379  struct GNUNET_DATASTORE_Handle *h;
380 
382  "Establishing DATASTORE connection!\n");
383  h = GNUNET_new (struct GNUNET_DATASTORE_Handle);
384  h->cfg = cfg;
385  try_reconnect (h);
386  if (NULL == h->mq)
387  {
388  GNUNET_free (h);
389  return NULL;
390  }
391  h->stats = GNUNET_STATISTICS_create ("datastore-api",
392  cfg);
393  return h;
394 }
395 
396 
403 static void
405 {
406  struct GNUNET_DATASTORE_Handle *h = cls;
407 
409  "Drop sent, disconnecting\n");
411  GNUNET_NO);
412 }
413 
414 
421 static void
423  enum GNUNET_MQ_Error error)
424 {
425  struct GNUNET_DATASTORE_Handle *h = cls;
426 
428  "Failed to ask datastore to drop tables\n");
430  GNUNET_NO);
431 }
432 
433 
441 void
443  int drop)
444 {
446 
448  "Datastore disconnect\n");
449  if (NULL != h->mq)
450  {
451  GNUNET_MQ_destroy (h->mq);
452  h->mq = NULL;
453  }
454  if (NULL != h->reconnect_task)
455  {
457  h->reconnect_task = NULL;
458  }
459  while (NULL != (qe = h->queue_head))
460  {
461  switch (qe->response_type)
462  {
464  if (NULL != qe->qc.sc.cont)
465  qe->qc.sc.cont (qe->qc.sc.cont_cls,
468  _("Disconnected from DATASTORE"));
469  break;
471  if (NULL != qe->qc.rc.proc)
472  qe->qc.rc.proc (qe->qc.rc.proc_cls,
473  NULL,
474  0,
475  NULL,
476  0,
477  0,
478  0,
479  0,
481  0);
482  break;
483  default:
484  GNUNET_break (0);
485  }
486  free_queue_entry (qe);
487  }
488  if (GNUNET_YES == drop)
489  {
491  "Re-connecting to issue DROP!\n");
492  GNUNET_assert (NULL == h->mq);
493  h->mq = GNUNET_CLIENT_connect (h->cfg,
494  "datastore",
495  NULL,
497  h);
498  if (NULL != h->mq)
499  {
500  struct GNUNET_MessageHeader *hdr;
501  struct GNUNET_MQ_Envelope *env;
502 
503  env = GNUNET_MQ_msg (hdr,
507  h);
508  GNUNET_MQ_send (h->mq,
509  env);
510  return;
511  }
512  GNUNET_break (0);
513  }
515  GNUNET_NO);
516  h->stats = NULL;
517  GNUNET_free (h);
518 }
519 
520 
536 static struct GNUNET_DATASTORE_QueueEntry *
538  struct GNUNET_MQ_Envelope *env,
539  unsigned int queue_priority,
540  unsigned int max_queue_size,
541  uint16_t expected_type,
542  const union QueueContext *qc)
543 {
545  struct GNUNET_DATASTORE_QueueEntry *pos;
546  unsigned int c;
547 
548  if ( (NULL != h->queue_tail) &&
549  (h->queue_tail->priority >= queue_priority) )
550  {
551  c = h->queue_size;
552  pos = NULL;
553  }
554  else
555  {
556  c = 0;
557  pos = h->queue_head;
558  }
559  while ( (NULL != pos) &&
560  (c < max_queue_size) &&
561  (pos->priority >= queue_priority) )
562  {
563  c++;
564  pos = pos->next;
565  }
566  if (c >= max_queue_size)
567  {
569  gettext_noop ("# queue overflows"),
570  1,
571  GNUNET_NO);
572  GNUNET_MQ_discard (env);
573  return NULL;
574  }
576  qe->h = h;
577  qe->env = env;
578  qe->response_type = expected_type;
579  qe->qc = *qc;
580  qe->priority = queue_priority;
581  qe->max_queue = max_queue_size;
582  if (NULL == pos)
583  {
584  /* append at the tail */
585  pos = h->queue_tail;
586  }
587  else
588  {
589  pos = pos->prev;
590  /* do not insert at HEAD if HEAD query was already
591  * transmitted and we are still receiving replies! */
592  if ( (NULL == pos) &&
593  (NULL == h->queue_head->env) )
594  pos = h->queue_head;
595  }
596  c++;
597 #if INSANE_STATISTICS
599  gettext_noop ("# queue entries created"),
600  1,
601  GNUNET_NO);
602 #endif
604  h->queue_tail,
605  pos,
606  qe);
607  h->queue_size++;
608  return qe;
609 }
610 
611 
618 static void
620 {
622 
623  if (NULL == (qe = h->queue_head))
624  {
625  /* no entry in queue */
627  "Queue empty\n");
628  return;
629  }
630  if (NULL == qe->env)
631  {
632  /* waiting for replies */
634  "Head request already transmitted\n");
635  return;
636  }
637  if (NULL == h->mq)
638  {
639  /* waiting for reconnect */
641  "Not connected\n");
642  return;
643  }
644  GNUNET_assert (NULL == qe->delay_warn_task);
646  &delay_warning,
647  qe);
648  GNUNET_MQ_send (h->mq,
649  qe->env);
650  qe->env = NULL;
651 }
652 
653 
661 static struct GNUNET_DATASTORE_QueueEntry *
663  uint16_t response_type)
664 {
666 
667  if (h->skip_next_messages > 0)
668  {
669  h->skip_next_messages--;
670  process_queue (h);
671  return NULL;
672  }
673  qe = h->queue_head;
674  if (NULL == qe)
675  {
676  GNUNET_break (0);
677  do_disconnect (h);
678  return NULL;
679  }
680  if (NULL != qe->env)
681  {
682  GNUNET_break (0);
683  do_disconnect (h);
684  return NULL;
685  }
686  if (response_type != qe->response_type)
687  {
688  GNUNET_break (0);
689  do_disconnect (h);
690  return NULL;
691  }
692  return qe;
693 }
694 
695 
703 static int
704 check_status (void *cls,
705  const struct StatusMessage *sm)
706 {
707  uint16_t msize = ntohs (sm->header.size) - sizeof (*sm);
708  int32_t status = ntohl (sm->status);
709 
710  if (msize > 0)
711  {
712  const char *emsg = (const char *) &sm[1];
713 
714  if ('\0' != emsg[msize - 1])
715  {
716  GNUNET_break (0);
717  return GNUNET_SYSERR;
718  }
719  }
720  else if (GNUNET_SYSERR == status)
721  {
722  GNUNET_break (0);
723  return GNUNET_SYSERR;
724  }
725  return GNUNET_OK;
726 }
727 
728 
735 static void
736 handle_status (void *cls,
737  const struct StatusMessage *sm)
738 {
739  struct GNUNET_DATASTORE_Handle *h = cls;
741  struct StatusContext rc;
742  const char *emsg;
743  int32_t status = ntohl (sm->status);
744 
745  qe = get_queue_head (h,
747  if (NULL == qe)
748  return;
749  rc = qe->qc.sc;
750  free_queue_entry (qe);
751  if (ntohs (sm->header.size) > sizeof (struct StatusMessage))
752  emsg = (const char *) &sm[1];
753  else
754  emsg = NULL;
756  "Received status %d/%s\n",
757  (int) status,
758  emsg);
760  gettext_noop ("# status messages received"),
761  1,
762  GNUNET_NO);
764  process_queue (h);
765  if (NULL != rc.cont)
766  rc.cont (rc.cont_cls,
767  status,
769  emsg);
770 }
771 
772 
779 static int
780 check_data (void *cls,
781  const struct DataMessage *dm)
782 {
783  uint16_t msize = ntohs (dm->header.size) - sizeof (*dm);
784 
785  if (msize != ntohl (dm->size))
786  {
787  GNUNET_break (0);
788  return GNUNET_SYSERR;
789  }
790  return GNUNET_OK;
791 }
792 
793 
800 static void
801 handle_data (void *cls,
802  const struct DataMessage *dm)
803 {
804  struct GNUNET_DATASTORE_Handle *h = cls;
806  struct ResultContext rc;
807 
808  qe = get_queue_head (h,
810  if (NULL == qe)
811  return;
812 #if INSANE_STATISTICS
814  gettext_noop ("# Results received"),
815  1,
816  GNUNET_NO);
817 #endif
819  "Received result %llu with type %u and size %u with key %s\n",
820  (unsigned long long) GNUNET_ntohll (dm->uid),
821  ntohl (dm->type),
822  ntohl (dm->size),
823  GNUNET_h2s (&dm->key));
824  rc = qe->qc.rc;
825  free_queue_entry (qe);
827  process_queue (h);
828  if (NULL != rc.proc)
829  rc.proc (rc.proc_cls,
830  &dm->key,
831  ntohl (dm->size),
832  &dm[1],
833  ntohl (dm->type),
834  ntohl (dm->priority),
835  ntohl (dm->anonymity),
836  ntohl (dm->replication),
838  GNUNET_ntohll (dm->uid));
839 }
840 
841 
849 static void
850 handle_data_end (void *cls,
851  const struct GNUNET_MessageHeader *msg)
852 {
853  struct GNUNET_DATASTORE_Handle *h = cls;
855  struct ResultContext rc;
856 
857  qe = get_queue_head (h,
859  if (NULL == qe)
860  return;
861  rc = qe->qc.rc;
862  free_queue_entry (qe);
864  "Received end of result set, new queue size is %u\n",
865  h->queue_size);
867  h->result_count = 0;
868  process_queue (h);
869  /* signal end of iteration */
870  if (NULL != rc.proc)
871  rc.proc (rc.proc_cls,
872  NULL,
873  0,
874  NULL,
875  0,
876  0,
877  0,
878  0,
880  0);
881 }
882 
883 
889 static void
890 try_reconnect (void *cls)
891 {
892  struct GNUNET_DATASTORE_Handle *h = cls;
893  struct GNUNET_MQ_MessageHandler handlers[] = {
896  struct StatusMessage,
897  h),
900  struct DataMessage,
901  h),
902  GNUNET_MQ_hd_fixed_size (data_end,
904  struct GNUNET_MessageHeader,
905  h),
907  };
908 
910  h->reconnect_task = NULL;
911  GNUNET_assert (NULL == h->mq);
912  h->mq = GNUNET_CLIENT_connect (h->cfg,
913  "datastore",
914  handlers,
916  h);
917  if (NULL == h->mq)
918  return;
920  gettext_noop ("# datastore connections (re)created"),
921  1,
922  GNUNET_NO);
924  "Reconnected to DATASTORE\n");
925  process_queue (h);
926 }
927 
928 
937 static void
939  int32_t result,
941  const char *emsg)
942 {
943  /* do nothing */
944 }
945 
946 
974  uint32_t rid,
975  const struct GNUNET_HashCode *key,
976  size_t size,
977  const void *data,
978  enum GNUNET_BLOCK_Type type,
979  uint32_t priority,
980  uint32_t anonymity,
981  uint32_t replication,
983  unsigned int queue_priority,
984  unsigned int max_queue_size,
986  void *cont_cls)
987 {
989  struct GNUNET_MQ_Envelope *env;
990  struct DataMessage *dm;
991  union QueueContext qc;
992 
993  if (size + sizeof (*dm) >= GNUNET_MAX_MESSAGE_SIZE)
994  {
995  GNUNET_break (0);
996  return NULL;
997  }
998 
1000  "Asked to put %u bytes of data under key `%s' for %s\n",
1001  size,
1002  GNUNET_h2s (key),
1004  GNUNET_YES));
1005  env = GNUNET_MQ_msg_extra (dm,
1006  size,
1008  dm->rid = htonl (rid);
1009  dm->size = htonl ((uint32_t) size);
1010  dm->type = htonl (type);
1011  dm->priority = htonl (priority);
1012  dm->anonymity = htonl (anonymity);
1013  dm->replication = htonl (replication);
1014  dm->expiration = GNUNET_TIME_absolute_hton (expiration);
1015  dm->key = *key;
1016  GNUNET_memcpy (&dm[1],
1017  data,
1018  size);
1019  qc.sc.cont = cont;
1020  qc.sc.cont_cls = cont_cls;
1021  qe = make_queue_entry (h,
1022  env,
1023  queue_priority,
1024  max_queue_size,
1026  &qc);
1027  if (NULL == qe)
1028  {
1030  "Could not create queue entry for PUT\n");
1031  return NULL;
1032  }
1034  gettext_noop ("# PUT requests executed"),
1035  1,
1036  GNUNET_NO);
1037  process_queue (h);
1038  return qe;
1039 }
1040 
1041 
1059  uint64_t amount,
1060  uint32_t entries,
1062  void *cont_cls)
1063 {
1065  struct GNUNET_MQ_Envelope *env;
1066  struct ReserveMessage *rm;
1067  union QueueContext qc;
1068 
1069  if (NULL == cont)
1070  cont = &drop_status_cont;
1072  "Asked to reserve %llu bytes of data and %u entries\n",
1073  (unsigned long long) amount,
1074  (unsigned int) entries);
1075  env = GNUNET_MQ_msg (rm,
1077  rm->entries = htonl (entries);
1078  rm->amount = GNUNET_htonll (amount);
1079 
1080  qc.sc.cont = cont;
1081  qc.sc.cont_cls = cont_cls;
1082  qe = make_queue_entry (h,
1083  env,
1084  UINT_MAX,
1085  UINT_MAX,
1087  &qc);
1088  if (NULL == qe)
1089  {
1091  "Could not create queue entry to reserve\n");
1092  return NULL;
1093  }
1095  gettext_noop ("# RESERVE requests executed"),
1096  1,
1097  GNUNET_NO);
1098  process_queue (h);
1099  return qe;
1100 }
1101 
1102 
1125  uint32_t rid,
1126  unsigned int queue_priority,
1127  unsigned int max_queue_size,
1129  void *cont_cls)
1130 {
1132  struct GNUNET_MQ_Envelope *env;
1133  struct ReleaseReserveMessage *rrm;
1134  union QueueContext qc;
1135 
1136  if (NULL == cont)
1137  cont = &drop_status_cont;
1139  "Asked to release reserve %d\n",
1140  rid);
1141  env = GNUNET_MQ_msg (rrm,
1143  rrm->rid = htonl (rid);
1144  qc.sc.cont = cont;
1145  qc.sc.cont_cls = cont_cls;
1146  qe = make_queue_entry (h,
1147  env,
1148  queue_priority,
1149  max_queue_size,
1151  &qc);
1152  if (NULL == qe)
1153  {
1155  "Could not create queue entry to release reserve\n");
1156  return NULL;
1157  }
1159  gettext_noop
1160  ("# RELEASE RESERVE requests executed"), 1,
1161  GNUNET_NO);
1162  process_queue (h);
1163  return qe;
1164 }
1165 
1166 
1189  const struct GNUNET_HashCode *key,
1190  size_t size,
1191  const void *data,
1192  unsigned int queue_priority,
1193  unsigned int max_queue_size,
1195  void *cont_cls)
1196 {
1198  struct DataMessage *dm;
1199  struct GNUNET_MQ_Envelope *env;
1200  union QueueContext qc;
1201 
1202  if (sizeof (*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1203  {
1204  GNUNET_break (0);
1205  return NULL;
1206  }
1207  if (NULL == cont)
1208  cont = &drop_status_cont;
1210  "Asked to remove %u bytes under key `%s'\n",
1211  size,
1212  GNUNET_h2s (key));
1213  env = GNUNET_MQ_msg_extra (dm,
1214  size,
1216  dm->size = htonl (size);
1217  dm->key = *key;
1218  GNUNET_memcpy (&dm[1],
1219  data,
1220  size);
1221 
1222  qc.sc.cont = cont;
1223  qc.sc.cont_cls = cont_cls;
1224 
1225  qe = make_queue_entry (h,
1226  env,
1227  queue_priority,
1228  max_queue_size,
1230  &qc);
1231  if (NULL == qe)
1232  {
1234  "Could not create queue entry for REMOVE\n");
1235  return NULL;
1236  }
1238  gettext_noop ("# REMOVE requests executed"),
1239  1,
1240  GNUNET_NO);
1241  process_queue (h);
1242  return qe;
1243 }
1244 
1245 
1246 
1267  unsigned int queue_priority,
1268  unsigned int max_queue_size,
1270  void *proc_cls)
1271 {
1273  struct GNUNET_MQ_Envelope *env;
1274  struct GNUNET_MessageHeader *m;
1275  union QueueContext qc;
1276 
1277  GNUNET_assert (NULL != proc);
1279  "Asked to get replication entry\n");
1280  env = GNUNET_MQ_msg (m,
1282  qc.rc.proc = proc;
1283  qc.rc.proc_cls = proc_cls;
1284  qe = make_queue_entry (h,
1285  env,
1286  queue_priority,
1287  max_queue_size,
1289  &qc);
1290  if (NULL == qe)
1291  {
1293  "Could not create queue entry for GET REPLICATION\n");
1294  return NULL;
1295  }
1297  gettext_noop
1298  ("# GET REPLICATION requests executed"), 1,
1299  GNUNET_NO);
1300  process_queue (h);
1301  return qe;
1302 }
1303 
1304 
1323  uint64_t next_uid,
1324  unsigned int queue_priority,
1325  unsigned int max_queue_size,
1326  enum GNUNET_BLOCK_Type type,
1328  void *proc_cls)
1329 {
1331  struct GNUNET_MQ_Envelope *env;
1332  struct GetZeroAnonymityMessage *m;
1333  union QueueContext qc;
1334 
1335  GNUNET_assert (NULL != proc);
1338  "Asked to get a zero-anonymity entry of type %d\n",
1339  type);
1340  env = GNUNET_MQ_msg (m,
1342  m->type = htonl ((uint32_t) type);
1343  m->next_uid = GNUNET_htonll (next_uid);
1344  qc.rc.proc = proc;
1345  qc.rc.proc_cls = proc_cls;
1346  qe = make_queue_entry (h,
1347  env,
1348  queue_priority,
1349  max_queue_size,
1351  &qc);
1352  if (NULL == qe)
1353  {
1355  "Could not create queue entry for zero-anonymity procation\n");
1356  return NULL;
1357  }
1359  gettext_noop
1360  ("# GET ZERO ANONYMITY requests executed"), 1,
1361  GNUNET_NO);
1362  process_queue (h);
1363  return qe;
1364 }
1365 
1366 
1387  uint64_t next_uid,
1388  bool random,
1389  const struct GNUNET_HashCode *key,
1390  enum GNUNET_BLOCK_Type type,
1391  unsigned int queue_priority,
1392  unsigned int max_queue_size,
1394  void *proc_cls)
1395 {
1397  struct GNUNET_MQ_Envelope *env;
1398  struct GetKeyMessage *gkm;
1399  struct GetMessage *gm;
1400  union QueueContext qc;
1401 
1402  GNUNET_assert (NULL != proc);
1404  "Asked to look for data of type %u under key `%s'\n",
1405  (unsigned int) type,
1406  GNUNET_h2s (key));
1407  if (NULL == key)
1408  {
1409  env = GNUNET_MQ_msg (gm,
1411  gm->type = htonl (type);
1412  gm->next_uid = GNUNET_htonll (next_uid);
1413  gm->random = random;
1414  }
1415  else
1416  {
1417  env = GNUNET_MQ_msg (gkm,
1419  gkm->type = htonl (type);
1420  gkm->next_uid = GNUNET_htonll (next_uid);
1421  gkm->random = random;
1422  gkm->key = *key;
1423  }
1424  qc.rc.proc = proc;
1425  qc.rc.proc_cls = proc_cls;
1426  qe = make_queue_entry (h,
1427  env,
1428  queue_priority,
1429  max_queue_size,
1431  &qc);
1432  if (NULL == qe)
1433  {
1435  "Could not queue request for `%s'\n",
1436  GNUNET_h2s (key));
1437  return NULL;
1438  }
1439 #if INSANE_STATISTICS
1441  gettext_noop ("# GET requests executed"),
1442  1,
1443  GNUNET_NO);
1444 #endif
1445  process_queue (h);
1446  return qe;
1447 }
1448 
1449 
1456 void
1458 {
1459  struct GNUNET_DATASTORE_Handle *h = qe->h;
1460 
1462  "Pending DATASTORE request %p cancelled (%d, %d)\n",
1463  qe,
1464  NULL == qe->env,
1465  h->queue_head == qe);
1466  if (NULL == qe->env)
1467  {
1468  free_queue_entry (qe);
1469  h->skip_next_messages++;
1470  return;
1471  }
1472  free_queue_entry (qe);
1473  process_queue (h);
1474 }
1475 
1476 
1477 /* end of datastore_api.c */
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_MESSAGE_TYPE_DATASTORE_STATUS
Message sent by datastore to client informing about status processing a request (in response to RESER...
static struct GNUNET_TIME_Absolute min_expiration
Minimum time that content should have to not be discarded instantly (time stamp of any content that w...
unsigned int queue_size
Number of entries in the queue.
static struct GNUNET_DATASTORE_QueueEntry * qe
Current operation.
uint32_t random
If true return a random result.
Definition: datastore.h:129
struct GNUNET_MQ_Handle * mq
Current connection to the datastore service.
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
static char * expiration
Credential TTL.
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_ntoh(struct GNUNET_TIME_AbsoluteNBO a)
Convert absolute time from network byte order.
Definition: time.c:670
#define GNUNET_TIME_UNIT_ZERO_ABS
Absolute time zero.
struct GNUNET_TIME_AbsoluteNBO expiration
Expiration time (NBO); zero for remove.
Definition: datastore.h:252
uint32_t type
Desired content type (actually an enum GNUNET_BLOCK_Type)
Definition: datastore.h:182
uint32_t entries
Number of items to reserve.
Definition: datastore.h:49
Any type of block, used as a wildcard when searching.
Message from datastore service informing client about the current size of the datastore.
Definition: datastore.h:39
struct GNUNET_DATASTORE_QueueEntry * prev
This is a linked list.
struct GNUNET_MQ_Handle * GNUNET_CLIENT_connect(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *service_name, const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_ErrorHandler error_handler, void *error_handler_cls)
Create a message queue to connect to a GNUnet service.
Definition: client.c:901
struct GNUNET_DATASTORE_QueueEntry * queue_tail
Current tail of priority queue.
struct GNUNET_DATASTORE_Handle * h
Handle to the master context.
GNUNET_BLOCK_Type
Blocks in the datastore and the datacache must have a unique type.
static void try_reconnect(void *cls)
Try reconnecting to the datastore service.
uint32_t type
Type of the item (NBO), zero for remove, (actually an enum GNUNET_BLOCK_Type)
Definition: datastore.h:219
GNUNET_MQ_Error
Error codes for the queue.
static void process_queue(struct GNUNET_DATASTORE_Handle *h)
Process entries in the queue (or do nothing if we are already doing so).
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_put(struct GNUNET_DATASTORE_Handle *h, uint32_t rid, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Store an item in the datastore.
struct GNUNET_STATISTICS_Handle * stats
Handle for statistics.
struct GNUNET_MQ_Envelope * env
Envelope of the request to transmit, NULL after transmission.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
void * proc_cls
Closure for proc.
Definition: datastore_api.c:82
uint32_t random
If true return a random result.
Definition: datastore.h:163
static void delay_warning(void *cls)
Task that logs an error after some time.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_reserve(struct GNUNET_DATASTORE_Handle *h, uint64_t amount, uint32_t entries, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Reserve space in the datastore.
unsigned int result_count
Number of results we&#39;re receiving for the current query after application stopped to care...
Message from datastore client informing service that the remainder of the reserved bytes can now be r...
Definition: datastore.h:90
static unsigned int replication
GNUNET_DATASTORE_ContinuationWithStatus cont
Function to call after transmission of the request.
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
struct GNUNET_SCHEDULER_Task * reconnect_task
Task for trying to reconnect.
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:67
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION
Message sent by datastore client to get random data.
Message to the datastore service asking about specific content.
Definition: datastore.h:109
struct GNUNET_TIME_AbsoluteNBO min_expiration
Minimum expiration time required for content to be stored by the datacache at this time...
Definition: datastore.h:80
#define GNUNET_NO
Definition: gnunet_common.h:81
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:78
static void handle_data(void *cls, const struct DataMessage *dm)
Handle data message we got from the service.
Message to the datastore service asking about zero anonymity content.
Definition: datastore.h:172
#define GNUNET_new(type)
Allocate a struct or union of the given type.
static struct GNUNET_DATASTORE_QueueEntry * get_queue_head(struct GNUNET_DATASTORE_Handle *h, uint16_t response_type)
Get the entry at the head of the message queue.
static void handle_status(void *cls, const struct StatusMessage *sm)
Function called to handle status message from the service.
uint64_t next_uid
UID at which to start the search.
Definition: datastore.h:158
#define GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE
Message sent by datastore client on join.
#define GNUNET_MESSAGE_TYPE_DATASTORE_PUT
Message sent by datastore client to store data.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve(struct GNUNET_DATASTORE_Handle *h, uint32_t rid, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Signal that all of the data for which a reservation was made has been stored and that whatever excess...
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
Handle for the service.
const struct GNUNET_CONFIGURATION_Handle * cfg
Our configuration.
static void drop_status_cont(void *cls, int32_t result, struct GNUNET_TIME_Absolute min_expiration, const char *emsg)
Dummy continuation used to do nothing (but be non-zero).
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur...
static struct GNUNET_ARM_Handle * h
Connection with ARM.
Definition: gnunet-arm.c:94
#define _(String)
GNU gettext support macro.
Definition: platform.h:208
struct StatusContext sc
Definition: datastore_api.c:93
#define GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE
Message sent by datastore client on join.
static struct GNUNET_ARM_MonitorHandle * m
Monitor connection with ARM.
Definition: gnunet-arm.c:99
structs for communication between datastore service and API
GNUNET_DATASTORE_DatumProcessor proc
Function to call with the result.
Definition: datastore_api.c:77
struct ResultContext rc
Definition: datastore_api.c:95
unsigned int max_queue
Maximum allowed length of queue (otherwise this request should be discarded).
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct...
Definition: gnunet_mq_lib.h:52
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1246
static void free_queue_entry(struct GNUNET_DATASTORE_QueueEntry *qe)
Free a queue entry.
Entry in our priority queue.
#define GNUNET_MESSAGE_TYPE_DATASTORE_DROP
Message sent by datastore client to drop the database.
#define GNUNET_memcpy(dst, src, n)
void * cls
Closure for mv and cb.
void GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Call a callback once the envelope has been sent, that is, sending it can not be canceled anymore...
Definition: mq.c:774
unsigned int skip_next_messages
We should ignore the next message(s) from the service.
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
Message to the datastore service asking about specific content.
Definition: datastore.h:143
uint32_t type
Desired content type.
Definition: datastore.h:119
struct ListEntry * entries
List of peers in the list.
GNUNET_DATASTORE_ContinuationWithStatus cont
Continuation to call with the status.
Definition: datastore_api.c:59
const char * GNUNET_STRINGS_relative_time_to_string(struct GNUNET_TIME_Relative delta, int do_round)
Give relative time in human-readable fancy format.
Definition: strings.c:727
void(* GNUNET_DATASTORE_DatumProcessor)(void *cls, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid)
Process a datum that was stored in the datastore.
uint16_t status
See PRISM_STATUS_*-constants.
static int result
Global testing status.
#define GNUNET_CONTAINER_DLL_insert_after(head, tail, other, element)
Insert an element into a DLL after the given other element.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_remove(struct GNUNET_DATASTORE_Handle *h, const struct GNUNET_HashCode *key, size_t size, const void *data, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Explicitly remove some content from the database.
static void disconnect_after_drop(void *cls)
Task used by to disconnect from the datastore after we send the GNUNET_MESSAGE_TYPE_DATASTORE_DROP me...
A 512-bit hashcode.
Message handler for a specific message type.
uint32_t type
Desired content type.
Definition: datastore.h:153
#define LOG(kind,...)
Definition: datastore_api.c:34
#define GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END
Message sent by datastore to client signaling end of matching data.
uint32_t replication
Desired replication level.
Definition: datastore.h:234
static struct GNUNET_DATASTORE_QueueEntry * make_queue_entry(struct GNUNET_DATASTORE_Handle *h, struct GNUNET_MQ_Envelope *env, unsigned int queue_priority, unsigned int max_queue_size, uint16_t expected_type, const union QueueContext *qc)
Create a new entry for our priority queue (and possibly discard other entires if the queue is getting...
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *mqm)
Discard the message queue message, free all allocated resources.
Definition: mq.c:321
uint32_t rid
Reservation ID to use; use zero for none.
Definition: datastore.h:209
static struct GNUNET_CONFIGURATION_Handle * cfg
Our configuration.
Definition: gnunet-arm.c:104
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:35
union QueueContext qc
Context for the operation.
struct GNUNET_SCHEDULER_Task * delay_warn_task
Task we run if this entry stalls the queue and we need to warn the user.
Message transmitting content from or to the datastore service.
Definition: datastore.h:196
struct GNUNET_HashCode key
Desired key.
Definition: datastore.h:134
struct GNUNET_HashCode key
The key used in the DHT.
#define GNUNET_SYSERR
Definition: gnunet_common.h:79
static unsigned int size
Size of the "table".
Definition: peer.c:67
Context for a queue operation.
Definition: datastore_api.c:90
uint32_t size
Number of bytes in the item (NBO).
Definition: datastore.h:214
uint32_t anonymity
Desired anonymity level (NBO), zero for remove.
Definition: datastore.h:229
void(* GNUNET_DATASTORE_ContinuationWithStatus)(void *cls, int32_t success, struct GNUNET_TIME_Absolute min_expiration, const char *msg)
Continuation called to notify client about result of the operation.
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
static struct GNUNET_FS_SearchContext * sc
Definition: gnunet-search.c:37
#define GNUNET_TIME_STD_BACKOFF(r)
Perform our standard exponential back-off calculation, starting at 1 ms and then going by a factor of...
int32_t rid
Reservation id.
Definition: datastore.h:100
Context for processing result messages.
Definition: datastore_api.c:72
#define GNUNET_TIME_UNIT_ZERO
Relative time zero.
#define GNUNET_MAX_MESSAGE_SIZE
Largest supported message (to be precise, one byte more than the largest possible message...
struct GNUNET_MessageHeader header
Type is either GNUNET_MESSAGE_TYPE_DATASTORE_PUT, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE or GNUNET_MESS...
Definition: datastore.h:204
static void disconnect_on_mq_error(void *cls, enum GNUNET_MQ_Error error)
Handle error in sending drop request to datastore.
Handle to a message queue.
Definition: mq.c:85
static int check_data(void *cls, const struct DataMessage *dm)
Check data message we received from the service.
void GNUNET_DATASTORE_cancel(struct GNUNET_DATASTORE_QueueEntry *qe)
Cancel a datastore operation.
uint64_t next_uid
UID at which to start the search.
Definition: datastore.h:187
uint64_t uid
Unique ID for the content (can be used for UPDATE); can be zero for remove (which indicates that the ...
Definition: datastore.h:247
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY
Message sent by datastore client to get random data.
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY
Message sent by datastore client to get data by key.
configuration data
Definition: configuration.c:85
void GNUNET_DATASTORE_disconnect(struct GNUNET_DATASTORE_Handle *h, int drop)
Disconnect from the datastore service (and free associated resources).
struct GNUNET_HashCode key
Key under which the item can be found.
Definition: datastore.h:257
static void do_disconnect(struct GNUNET_DATASTORE_Handle *h)
Disconnect from the service and then try reconnecting to the datastore service after some delay...
uint32_t priority
Priority of the item (NBO), zero for remove.
Definition: datastore.h:224
Handle to the datastore service.
struct GNUNET_DATASTORE_QueueEntry * queue_head
Current head of priority queue.
static void handle_data_end(void *cls, const struct GNUNET_MessageHeader *msg)
Type of a function to call when we receive a GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the ...
#define GNUNET_log(kind,...)
Entry in list of pending tasks.
Definition: scheduler.c:134
Message from datastore service informing client about the success or failure of a requested operation...
Definition: datastore.h:64
struct GNUNET_TIME_Relative retry_time
How quickly should we retry? Used for exponential back-off on connect-errors.
uint16_t response_type
Expected response type.
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_remaining(struct GNUNET_TIME_Absolute future)
Given a timestamp in the future, how much time remains until then?
Definition: time.c:331
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
void * cont_cls
Closure for cont.
Header for all communications.
Time for absolute times used by GNUnet, in microseconds.
void GNUNET_MQ_destroy(struct GNUNET_MQ_Handle *mq)
Destroy the message queue.
Definition: mq.c:824
#define GNUNET_YES
Definition: gnunet_common.h:80
static unsigned int anonymity
int32_t status
Status code, -1 for errors.
Definition: datastore.h:74
void * cont_cls
Closure for cont.
Definition: datastore_api.c:64
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:353
struct GNUNET_DATASTORE_QueueEntry * next
This is a linked list.
static void mq_error_handler(void *cls, enum GNUNET_MQ_Error error)
Handle error in sending drop request to datastore.
#define GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE
Message sent by datastore client to remove data.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_key(struct GNUNET_DATASTORE_Handle *h, uint64_t next_uid, bool random, const struct GNUNET_HashCode *key, enum GNUNET_BLOCK_Type type, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a result for a particular key from the datastore.
uint64_t next_uid
UID at which to start the search.
Definition: datastore.h:124
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET
Message sent by datastore client to get data.
uint32_t data
The data value.
struct GNUNET_DATASTORE_Handle * GNUNET_DATASTORE_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the datastore service.
struct GNUNET_TIME_AbsoluteNBO GNUNET_TIME_absolute_hton(struct GNUNET_TIME_Absolute a)
Convert absolute time to network byte order.
Definition: time.c:654
unsigned int priority
Priority in the queue.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_for_replication(struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a random value from the datastore for content replication.
struct GNUNET_MessageHeader header
Type is GNUNET_MESSAGE_TYPE_DATASTORE_STATUS.
Definition: datastore.h:69
static int check_status(void *cls, const struct StatusMessage *sm)
Function called to check status message from the service.
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
#define DELAY_WARN_TIMEOUT
Definition: datastore_api.c:36
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_zero_anonymity(struct GNUNET_DATASTORE_Handle *h, uint64_t next_uid, unsigned int queue_priority, unsigned int max_queue_size, enum GNUNET_BLOCK_Type type, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a single zero-anonymity value from the datastore.
#define GNUNET_MESSAGE_TYPE_DATASTORE_DATA
Message sent by datastore to client providing requested data (in response to GET or GET_RANDOM reques...
Context for processing status messages.
Definition: datastore_api.c:54
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:48
#define GNUNET_free(ptr)
Wrapper around free.
Time for relative time used by GNUnet, in microseconds.
#define gettext_noop(String)
Definition: gettext.h:69
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:965
uint64_t amount
Number of bytes to reserve.
Definition: datastore.h:54