GNUnet  0.10.x
datastore_api.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet
3  Copyright (C) 2004-2013, 2016 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
20 
27 #include "platform.h"
28 #include "gnunet_arm_service.h"
29 #include "gnunet_constants.h"
32 #include "datastore.h"
33 
34 #define LOG(kind, ...) GNUNET_log_from(kind, "datastore-api", __VA_ARGS__)
35 
36 #define DELAY_WARN_TIMEOUT GNUNET_TIME_UNIT_MINUTES
37 
41 #define INSANE_STATISTICS GNUNET_NO
42 
49 #define MAX_EXCESS_RESULTS 8
50 
54 struct StatusContext {
59 
63  void *cont_cls;
64 };
65 
66 
70 struct ResultContext {
75 
79  void *proc_cls;
80 };
81 
82 
86 union QueueContext {
87  struct StatusContext sc;
88 
89  struct ResultContext rc;
90 };
91 
92 
101 
106 
111 
116 
120  void *cont_cls;
121 
125  union QueueContext qc;
126 
132 
138 
142  unsigned int priority;
143 
148  unsigned int max_queue;
149 
153  uint16_t response_type;
154 };
155 
156 
165 
170 
175 
180 
185 
190 
195  struct GNUNET_TIME_Relative retry_time;
196 
200  unsigned int queue_size;
201 
207  unsigned int result_count;
208 
212  unsigned int skip_next_messages;
213 };
214 
215 
221 static void
222 try_reconnect(void *cls);
223 
224 
231 static void
233 {
234  if (NULL == h->mq)
235  {
236  GNUNET_break(0);
237  return;
238  }
239  GNUNET_MQ_destroy(h->mq);
240  h->mq = NULL;
241  h->skip_next_messages = 0;
242  h->reconnect_task
244  &try_reconnect,
245  h);
246 }
247 
248 
256 static void
258 {
259  struct GNUNET_DATASTORE_Handle *h = qe->h;
260 
262  h->queue_tail,
263  qe);
264  h->queue_size--;
265  if (NULL != qe->env)
266  GNUNET_MQ_discard(qe->env);
267  if (NULL != qe->delay_warn_task)
269  GNUNET_free(qe);
270 }
271 
272 
278 static void
279 delay_warning(void *cls)
280 {
281  struct GNUNET_DATASTORE_QueueEntry *qe = cls;
282 
283  qe->delay_warn_task = NULL;
285  "Request %p of type %u at head of datastore queue for more than %s\n",
286  qe,
287  (unsigned int)qe->response_type,
289  GNUNET_YES));
291  &delay_warning,
292  qe);
293 }
294 
295 
302 static void
304  enum GNUNET_MQ_Error error)
305 {
306  struct GNUNET_DATASTORE_Handle *h = cls;
308 
310  "MQ error, reconnecting to DATASTORE\n");
311  do_disconnect(h);
312  qe = h->queue_head;
313  if (NULL == qe)
314  return;
315  if (NULL != qe->delay_warn_task)
316  {
318  qe->delay_warn_task = NULL;
319  }
320  if (NULL == qe->env)
321  {
322  union QueueContext qc = qe->qc;
323  uint16_t rt = qe->response_type;
324 
326  "Failed to receive response from database.\n");
327  free_queue_entry(qe);
328  switch (rt)
329  {
331  if (NULL != qc.sc.cont)
332  qc.sc.cont(qc.sc.cont_cls,
335  _("DATASTORE disconnected"));
336  break;
337 
339  if (NULL != qc.rc.proc)
340  qc.rc.proc(qc.rc.proc_cls,
341  NULL,
342  0,
343  NULL,
344  0,
345  0,
346  0,
347  0,
349  0);
350  break;
351 
352  default:
353  GNUNET_break(0);
354  }
355  }
356 }
357 
358 
367 {
368  struct GNUNET_DATASTORE_Handle *h;
369 
371  "Establishing DATASTORE connection!\n");
373  h->cfg = cfg;
374  try_reconnect(h);
375  if (NULL == h->mq)
376  {
377  GNUNET_free(h);
378  return NULL;
379  }
380  h->stats = GNUNET_STATISTICS_create("datastore-api",
381  cfg);
382  return h;
383 }
384 
385 
392 static void
394 {
395  struct GNUNET_DATASTORE_Handle *h = cls;
396 
398  "Drop sent, disconnecting\n");
400  GNUNET_NO);
401 }
402 
403 
410 static void
412  enum GNUNET_MQ_Error error)
413 {
414  struct GNUNET_DATASTORE_Handle *h = cls;
415 
417  "Failed to ask datastore to drop tables\n");
419  GNUNET_NO);
420 }
421 
422 
430 void
432  int drop)
433 {
435 
437  "Datastore disconnect\n");
438  if (NULL != h->mq)
439  {
440  GNUNET_MQ_destroy(h->mq);
441  h->mq = NULL;
442  }
443  if (NULL != h->reconnect_task)
444  {
446  h->reconnect_task = NULL;
447  }
448  while (NULL != (qe = h->queue_head))
449  {
450  switch (qe->response_type)
451  {
453  if (NULL != qe->qc.sc.cont)
454  qe->qc.sc.cont(qe->qc.sc.cont_cls,
457  _("Disconnected from DATASTORE"));
458  break;
459 
461  if (NULL != qe->qc.rc.proc)
462  qe->qc.rc.proc(qe->qc.rc.proc_cls,
463  NULL,
464  0,
465  NULL,
466  0,
467  0,
468  0,
469  0,
471  0);
472  break;
473 
474  default:
475  GNUNET_break(0);
476  }
477  free_queue_entry(qe);
478  }
479  if (GNUNET_YES == drop)
480  {
482  "Re-connecting to issue DROP!\n");
483  GNUNET_assert(NULL == h->mq);
484  h->mq = GNUNET_CLIENT_connect(h->cfg,
485  "datastore",
486  NULL,
488  h);
489  if (NULL != h->mq)
490  {
491  struct GNUNET_MessageHeader *hdr;
492  struct GNUNET_MQ_Envelope *env;
493 
494  env = GNUNET_MQ_msg(hdr,
498  h);
499  GNUNET_MQ_send(h->mq,
500  env);
501  return;
502  }
503  GNUNET_break(0);
504  }
506  GNUNET_NO);
507  h->stats = NULL;
508  GNUNET_free(h);
509 }
510 
511 
527 static struct GNUNET_DATASTORE_QueueEntry *
529  struct GNUNET_MQ_Envelope *env,
530  unsigned int queue_priority,
531  unsigned int max_queue_size,
532  uint16_t expected_type,
533  const union QueueContext *qc)
534 {
536  struct GNUNET_DATASTORE_QueueEntry *pos;
537  unsigned int c;
538 
539  if ((NULL != h->queue_tail) &&
540  (h->queue_tail->priority >= queue_priority))
541  {
542  c = h->queue_size;
543  pos = NULL;
544  }
545  else
546  {
547  c = 0;
548  pos = h->queue_head;
549  }
550  while ((NULL != pos) &&
551  (c < max_queue_size) &&
552  (pos->priority >= queue_priority))
553  {
554  c++;
555  pos = pos->next;
556  }
557  if (c >= max_queue_size)
558  {
560  gettext_noop("# queue overflows"),
561  1,
562  GNUNET_NO);
563  GNUNET_MQ_discard(env);
564  return NULL;
565  }
567  qe->h = h;
568  qe->env = env;
569  qe->response_type = expected_type;
570  qe->qc = *qc;
571  qe->priority = queue_priority;
572  qe->max_queue = max_queue_size;
573  if (NULL == pos)
574  {
575  /* append at the tail */
576  pos = h->queue_tail;
577  }
578  else
579  {
580  pos = pos->prev;
581  /* do not insert at HEAD if HEAD query was already
582  * transmitted and we are still receiving replies! */
583  if ((NULL == pos) &&
584  (NULL == h->queue_head->env))
585  pos = h->queue_head;
586  }
587  c++;
588 #if INSANE_STATISTICS
590  gettext_noop("# queue entries created"),
591  1,
592  GNUNET_NO);
593 #endif
595  h->queue_tail,
596  pos,
597  qe);
598  h->queue_size++;
599  return qe;
600 }
601 
602 
609 static void
611 {
613 
614  if (NULL == (qe = h->queue_head))
615  {
616  /* no entry in queue */
618  "Queue empty\n");
619  return;
620  }
621  if (NULL == qe->env)
622  {
623  /* waiting for replies */
625  "Head request already transmitted\n");
626  return;
627  }
628  if (NULL == h->mq)
629  {
630  /* waiting for reconnect */
632  "Not connected\n");
633  return;
634  }
635  GNUNET_assert(NULL == qe->delay_warn_task);
637  &delay_warning,
638  qe);
639  GNUNET_MQ_send(h->mq,
640  qe->env);
641  qe->env = NULL;
642 }
643 
644 
652 static struct GNUNET_DATASTORE_QueueEntry *
654  uint16_t response_type)
655 {
657 
658  if (h->skip_next_messages > 0)
659  {
660  h->skip_next_messages--;
661  process_queue(h);
662  return NULL;
663  }
664  qe = h->queue_head;
665  if (NULL == qe)
666  {
667  GNUNET_break(0);
668  do_disconnect(h);
669  return NULL;
670  }
671  if (NULL != qe->env)
672  {
673  GNUNET_break(0);
674  do_disconnect(h);
675  return NULL;
676  }
677  if (response_type != qe->response_type)
678  {
679  GNUNET_break(0);
680  do_disconnect(h);
681  return NULL;
682  }
683  return qe;
684 }
685 
686 
694 static int
695 check_status(void *cls,
696  const struct StatusMessage *sm)
697 {
698  uint16_t msize = ntohs(sm->header.size) - sizeof(*sm);
699  int32_t status = ntohl(sm->status);
700 
701  if (msize > 0)
702  {
703  const char *emsg = (const char *)&sm[1];
704 
705  if ('\0' != emsg[msize - 1])
706  {
707  GNUNET_break(0);
708  return GNUNET_SYSERR;
709  }
710  }
711  else if (GNUNET_SYSERR == status)
712  {
713  GNUNET_break(0);
714  return GNUNET_SYSERR;
715  }
716  return GNUNET_OK;
717 }
718 
719 
726 static void
727 handle_status(void *cls,
728  const struct StatusMessage *sm)
729 {
730  struct GNUNET_DATASTORE_Handle *h = cls;
732  struct StatusContext rc;
733  const char *emsg;
734  int32_t status = ntohl(sm->status);
735 
736  qe = get_queue_head(h,
738  if (NULL == qe)
739  return;
740  rc = qe->qc.sc;
741  free_queue_entry(qe);
742  if (ntohs(sm->header.size) > sizeof(struct StatusMessage))
743  emsg = (const char *)&sm[1];
744  else
745  emsg = NULL;
747  "Received status %d/%s\n",
748  (int)status,
749  emsg);
751  gettext_noop("# status messages received"),
752  1,
753  GNUNET_NO);
755  process_queue(h);
756  if (NULL != rc.cont)
757  rc.cont(rc.cont_cls,
758  status,
760  emsg);
761 }
762 
763 
770 static int
771 check_data(void *cls,
772  const struct DataMessage *dm)
773 {
774  uint16_t msize = ntohs(dm->header.size) - sizeof(*dm);
775 
776  if (msize != ntohl(dm->size))
777  {
778  GNUNET_break(0);
779  return GNUNET_SYSERR;
780  }
781  return GNUNET_OK;
782 }
783 
784 
791 static void
792 handle_data(void *cls,
793  const struct DataMessage *dm)
794 {
795  struct GNUNET_DATASTORE_Handle *h = cls;
797  struct ResultContext rc;
798 
799  qe = get_queue_head(h,
801  if (NULL == qe)
802  return;
803 #if INSANE_STATISTICS
805  gettext_noop("# Results received"),
806  1,
807  GNUNET_NO);
808 #endif
810  "Received result %llu with type %u and size %u with key %s\n",
811  (unsigned long long)GNUNET_ntohll(dm->uid),
812  ntohl(dm->type),
813  ntohl(dm->size),
814  GNUNET_h2s(&dm->key));
815  rc = qe->qc.rc;
816  free_queue_entry(qe);
818  process_queue(h);
819  if (NULL != rc.proc)
820  rc.proc(rc.proc_cls,
821  &dm->key,
822  ntohl(dm->size),
823  &dm[1],
824  ntohl(dm->type),
825  ntohl(dm->priority),
826  ntohl(dm->anonymity),
827  ntohl(dm->replication),
829  GNUNET_ntohll(dm->uid));
830 }
831 
832 
840 static void
841 handle_data_end(void *cls,
842  const struct GNUNET_MessageHeader *msg)
843 {
844  struct GNUNET_DATASTORE_Handle *h = cls;
846  struct ResultContext rc;
847 
848  qe = get_queue_head(h,
850  if (NULL == qe)
851  return;
852  rc = qe->qc.rc;
853  free_queue_entry(qe);
855  "Received end of result set, new queue size is %u\n",
856  h->queue_size);
858  h->result_count = 0;
859  process_queue(h);
860  /* signal end of iteration */
861  if (NULL != rc.proc)
862  rc.proc(rc.proc_cls,
863  NULL,
864  0,
865  NULL,
866  0,
867  0,
868  0,
869  0,
871  0);
872 }
873 
874 
880 static void
881 try_reconnect(void *cls)
882 {
883  struct GNUNET_DATASTORE_Handle *h = cls;
884  struct GNUNET_MQ_MessageHandler handlers[] = {
887  struct StatusMessage,
888  h),
891  struct DataMessage,
892  h),
893  GNUNET_MQ_hd_fixed_size(data_end,
895  struct GNUNET_MessageHeader,
896  h),
898  };
899 
901  h->reconnect_task = NULL;
902  GNUNET_assert(NULL == h->mq);
903  h->mq = GNUNET_CLIENT_connect(h->cfg,
904  "datastore",
905  handlers,
907  h);
908  if (NULL == h->mq)
909  return;
911  gettext_noop("# datastore connections (re)created"),
912  1,
913  GNUNET_NO);
915  "Reconnected to DATASTORE\n");
916  process_queue(h);
917 }
918 
919 
928 static void
930  int32_t result,
932  const char *emsg)
933 {
934  /* do nothing */
935 }
936 
937 
965  uint32_t rid,
966  const struct GNUNET_HashCode *key,
967  size_t size,
968  const void *data,
969  enum GNUNET_BLOCK_Type type,
970  uint32_t priority,
971  uint32_t anonymity,
972  uint32_t replication,
974  unsigned int queue_priority,
975  unsigned int max_queue_size,
977  void *cont_cls)
978 {
980  struct GNUNET_MQ_Envelope *env;
981  struct DataMessage *dm;
982  union QueueContext qc;
983 
984  if (size + sizeof(*dm) >= GNUNET_MAX_MESSAGE_SIZE)
985  {
986  GNUNET_break(0);
987  return NULL;
988  }
989 
991  "Asked to put %u bytes of data under key `%s' for %s\n",
992  size,
993  GNUNET_h2s(key),
995  GNUNET_YES));
996  env = GNUNET_MQ_msg_extra(dm,
997  size,
999  dm->rid = htonl(rid);
1000  dm->size = htonl((uint32_t)size);
1001  dm->type = htonl(type);
1002  dm->priority = htonl(priority);
1003  dm->anonymity = htonl(anonymity);
1004  dm->replication = htonl(replication);
1005  dm->expiration = GNUNET_TIME_absolute_hton(expiration);
1006  dm->key = *key;
1007  GNUNET_memcpy(&dm[1],
1008  data,
1009  size);
1010  qc.sc.cont = cont;
1011  qc.sc.cont_cls = cont_cls;
1012  qe = make_queue_entry(h,
1013  env,
1014  queue_priority,
1015  max_queue_size,
1017  &qc);
1018  if (NULL == qe)
1019  {
1021  "Could not create queue entry for PUT\n");
1022  return NULL;
1023  }
1025  gettext_noop("# PUT requests executed"),
1026  1,
1027  GNUNET_NO);
1028  process_queue(h);
1029  return qe;
1030 }
1031 
1032 
1050  uint64_t amount,
1051  uint32_t entries,
1053  void *cont_cls)
1054 {
1056  struct GNUNET_MQ_Envelope *env;
1057  struct ReserveMessage *rm;
1058  union QueueContext qc;
1059 
1060  if (NULL == cont)
1061  cont = &drop_status_cont;
1063  "Asked to reserve %llu bytes of data and %u entries\n",
1064  (unsigned long long)amount,
1065  (unsigned int)entries);
1066  env = GNUNET_MQ_msg(rm,
1068  rm->entries = htonl(entries);
1069  rm->amount = GNUNET_htonll(amount);
1070 
1071  qc.sc.cont = cont;
1072  qc.sc.cont_cls = cont_cls;
1073  qe = make_queue_entry(h,
1074  env,
1075  UINT_MAX,
1076  UINT_MAX,
1078  &qc);
1079  if (NULL == qe)
1080  {
1082  "Could not create queue entry to reserve\n");
1083  return NULL;
1084  }
1086  gettext_noop("# RESERVE requests executed"),
1087  1,
1088  GNUNET_NO);
1089  process_queue(h);
1090  return qe;
1091 }
1092 
1093 
1116  uint32_t rid,
1117  unsigned int queue_priority,
1118  unsigned int max_queue_size,
1120  void *cont_cls)
1121 {
1123  struct GNUNET_MQ_Envelope *env;
1124  struct ReleaseReserveMessage *rrm;
1125  union QueueContext qc;
1126 
1127  if (NULL == cont)
1128  cont = &drop_status_cont;
1130  "Asked to release reserve %d\n",
1131  rid);
1132  env = GNUNET_MQ_msg(rrm,
1134  rrm->rid = htonl(rid);
1135  qc.sc.cont = cont;
1136  qc.sc.cont_cls = cont_cls;
1137  qe = make_queue_entry(h,
1138  env,
1139  queue_priority,
1140  max_queue_size,
1142  &qc);
1143  if (NULL == qe)
1144  {
1146  "Could not create queue entry to release reserve\n");
1147  return NULL;
1148  }
1150  gettext_noop
1151  ("# RELEASE RESERVE requests executed"), 1,
1152  GNUNET_NO);
1153  process_queue(h);
1154  return qe;
1155 }
1156 
1157 
1180  const struct GNUNET_HashCode *key,
1181  size_t size,
1182  const void *data,
1183  unsigned int queue_priority,
1184  unsigned int max_queue_size,
1186  void *cont_cls)
1187 {
1189  struct DataMessage *dm;
1190  struct GNUNET_MQ_Envelope *env;
1191  union QueueContext qc;
1192 
1193  if (sizeof(*dm) + size >= GNUNET_MAX_MESSAGE_SIZE)
1194  {
1195  GNUNET_break(0);
1196  return NULL;
1197  }
1198  if (NULL == cont)
1199  cont = &drop_status_cont;
1201  "Asked to remove %u bytes under key `%s'\n",
1202  size,
1203  GNUNET_h2s(key));
1204  env = GNUNET_MQ_msg_extra(dm,
1205  size,
1207  dm->size = htonl(size);
1208  dm->key = *key;
1209  GNUNET_memcpy(&dm[1],
1210  data,
1211  size);
1212 
1213  qc.sc.cont = cont;
1214  qc.sc.cont_cls = cont_cls;
1215 
1216  qe = make_queue_entry(h,
1217  env,
1218  queue_priority,
1219  max_queue_size,
1221  &qc);
1222  if (NULL == qe)
1223  {
1225  "Could not create queue entry for REMOVE\n");
1226  return NULL;
1227  }
1229  gettext_noop("# REMOVE requests executed"),
1230  1,
1231  GNUNET_NO);
1232  process_queue(h);
1233  return qe;
1234 }
1235 
1236 
1237 
1258  unsigned int queue_priority,
1259  unsigned int max_queue_size,
1261  void *proc_cls)
1262 {
1264  struct GNUNET_MQ_Envelope *env;
1265  struct GNUNET_MessageHeader *m;
1266  union QueueContext qc;
1267 
1268  GNUNET_assert(NULL != proc);
1270  "Asked to get replication entry\n");
1271  env = GNUNET_MQ_msg(m,
1273  qc.rc.proc = proc;
1274  qc.rc.proc_cls = proc_cls;
1275  qe = make_queue_entry(h,
1276  env,
1277  queue_priority,
1278  max_queue_size,
1280  &qc);
1281  if (NULL == qe)
1282  {
1284  "Could not create queue entry for GET REPLICATION\n");
1285  return NULL;
1286  }
1288  gettext_noop
1289  ("# GET REPLICATION requests executed"), 1,
1290  GNUNET_NO);
1291  process_queue(h);
1292  return qe;
1293 }
1294 
1295 
1314  uint64_t next_uid,
1315  unsigned int queue_priority,
1316  unsigned int max_queue_size,
1317  enum GNUNET_BLOCK_Type type,
1319  void *proc_cls)
1320 {
1322  struct GNUNET_MQ_Envelope *env;
1323  struct GetZeroAnonymityMessage *m;
1324  union QueueContext qc;
1325 
1326  GNUNET_assert(NULL != proc);
1329  "Asked to get a zero-anonymity entry of type %d\n",
1330  type);
1331  env = GNUNET_MQ_msg(m,
1333  m->type = htonl((uint32_t)type);
1334  m->next_uid = GNUNET_htonll(next_uid);
1335  qc.rc.proc = proc;
1336  qc.rc.proc_cls = proc_cls;
1337  qe = make_queue_entry(h,
1338  env,
1339  queue_priority,
1340  max_queue_size,
1342  &qc);
1343  if (NULL == qe)
1344  {
1346  "Could not create queue entry for zero-anonymity procation\n");
1347  return NULL;
1348  }
1350  gettext_noop
1351  ("# GET ZERO ANONYMITY requests executed"), 1,
1352  GNUNET_NO);
1353  process_queue(h);
1354  return qe;
1355 }
1356 
1357 
1378  uint64_t next_uid,
1379  bool random,
1380  const struct GNUNET_HashCode *key,
1381  enum GNUNET_BLOCK_Type type,
1382  unsigned int queue_priority,
1383  unsigned int max_queue_size,
1385  void *proc_cls)
1386 {
1388  struct GNUNET_MQ_Envelope *env;
1389  struct GetKeyMessage *gkm;
1390  struct GetMessage *gm;
1391  union QueueContext qc;
1392 
1393  GNUNET_assert(NULL != proc);
1395  "Asked to look for data of type %u under key `%s'\n",
1396  (unsigned int)type,
1397  GNUNET_h2s(key));
1398  if (NULL == key)
1399  {
1400  env = GNUNET_MQ_msg(gm,
1402  gm->type = htonl(type);
1403  gm->next_uid = GNUNET_htonll(next_uid);
1404  gm->random = random;
1405  }
1406  else
1407  {
1408  env = GNUNET_MQ_msg(gkm,
1410  gkm->type = htonl(type);
1411  gkm->next_uid = GNUNET_htonll(next_uid);
1412  gkm->random = random;
1413  gkm->key = *key;
1414  }
1415  qc.rc.proc = proc;
1416  qc.rc.proc_cls = proc_cls;
1417  qe = make_queue_entry(h,
1418  env,
1419  queue_priority,
1420  max_queue_size,
1422  &qc);
1423  if (NULL == qe)
1424  {
1426  "Could not queue request for `%s'\n",
1427  GNUNET_h2s(key));
1428  return NULL;
1429  }
1430 #if INSANE_STATISTICS
1432  gettext_noop("# GET requests executed"),
1433  1,
1434  GNUNET_NO);
1435 #endif
1436  process_queue(h);
1437  return qe;
1438 }
1439 
1440 
1447 void
1449 {
1450  struct GNUNET_DATASTORE_Handle *h = qe->h;
1451 
1453  "Pending DATASTORE request %p cancelled (%d, %d)\n",
1454  qe,
1455  NULL == qe->env,
1456  h->queue_head == qe);
1457  if (NULL == qe->env)
1458  {
1459  free_queue_entry(qe);
1460  h->skip_next_messages++;
1461  return;
1462  }
1463  free_queue_entry(qe);
1464  process_queue(h);
1465 }
1466 
1467 
1468 /* end of datastore_api.c */
#define GNUNET_CONTAINER_DLL_remove(head, tail, element)
Remove an element from a DLL.
#define GNUNET_MESSAGE_TYPE_DATASTORE_STATUS
Message sent by datastore to client informing about status processing a request (in response to RESER...
static struct GNUNET_TIME_Absolute min_expiration
Minimum time that content should have to not be discarded instantly (time stamp of any content that w...
unsigned int queue_size
Number of entries in the queue.
static struct GNUNET_DATASTORE_QueueEntry * qe
Current operation.
uint32_t random
If true return a random result.
Definition: datastore.h:123
struct GNUNET_MQ_Handle * mq
Current connection to the datastore service.
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
static char * expiration
Credential TTL.
struct GNUNET_TIME_Absolute GNUNET_TIME_absolute_ntoh(struct GNUNET_TIME_AbsoluteNBO a)
Convert absolute time from network byte order.
Definition: time.c:671
#define GNUNET_TIME_UNIT_ZERO_ABS
Absolute time zero.
struct GNUNET_TIME_AbsoluteNBO expiration
Expiration time (NBO); zero for remove.
Definition: datastore.h:240
uint32_t type
Desired content type (actually an enum GNUNET_BLOCK_Type)
Definition: datastore.h:172
uint32_t entries
Number of items to reserve.
Definition: datastore.h:48
Any type of block, used as a wildcard when searching.
Message from datastore service informing client about the current size of the datastore.
Definition: datastore.h:39
struct GNUNET_DATASTORE_QueueEntry * prev
This is a linked list.
struct GNUNET_MQ_Handle * GNUNET_CLIENT_connect(const struct GNUNET_CONFIGURATION_Handle *cfg, const char *service_name, const struct GNUNET_MQ_MessageHandler *handlers, GNUNET_MQ_ErrorHandler error_handler, void *error_handler_cls)
Create a message queue to connect to a GNUnet service.
Definition: client.c:900
struct GNUNET_DATASTORE_QueueEntry * queue_tail
Current tail of priority queue.
struct GNUNET_DATASTORE_Handle * h
Handle to the master context.
GNUNET_BLOCK_Type
Blocks in the datastore and the datacache must have a unique type.
static void try_reconnect(void *cls)
Try reconnecting to the datastore service.
uint32_t type
Type of the item (NBO), zero for remove, (actually an enum GNUNET_BLOCK_Type)
Definition: datastore.h:207
GNUNET_MQ_Error
Error codes for the queue.
static void process_queue(struct GNUNET_DATASTORE_Handle *h)
Process entries in the queue (or do nothing if we are already doing so).
struct GNUNET_STATISTICS_Handle * GNUNET_STATISTICS_create(const char *subsystem, const struct GNUNET_CONFIGURATION_Handle *cfg)
Get handle for the statistics service.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_put(struct GNUNET_DATASTORE_Handle *h, uint32_t rid, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Store an item in the datastore.
struct GNUNET_STATISTICS_Handle * stats
Handle for statistics.
struct GNUNET_MQ_Envelope * env
Envelope of the request to transmit, NULL after transmission.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
void * proc_cls
Closure for proc.
Definition: datastore_api.c:79
uint32_t random
If true return a random result.
Definition: datastore.h:155
static void delay_warning(void *cls)
Task that logs an error after some time.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_reserve(struct GNUNET_DATASTORE_Handle *h, uint64_t amount, uint32_t entries, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Reserve space in the datastore.
unsigned int result_count
Number of results we&#39;re receiving for the current query after application stopped to care...
Message from datastore client informing service that the remainder of the reserved bytes can now be r...
Definition: datastore.h:87
static unsigned int replication
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
GNUNET_DATASTORE_ContinuationWithStatus cont
Function to call after transmission of the request.
#define GNUNET_MQ_hd_fixed_size(name, code, str, ctx)
struct GNUNET_SCHEDULER_Task * reconnect_task
Task for trying to reconnect.
#define GNUNET_MQ_msg(mvar, type)
Allocate a GNUNET_MQ_Envelope.
Definition: gnunet_mq_lib.h:67
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_REPLICATION
Message sent by datastore client to get random data.
Message to the datastore service asking about specific content.
Definition: datastore.h:104
struct GNUNET_TIME_AbsoluteNBO min_expiration
Minimum expiration time required for content to be stored by the datacache at this time...
Definition: datastore.h:78
#define GNUNET_NO
Definition: gnunet_common.h:78
const char * GNUNET_h2s(const struct GNUNET_HashCode *hc)
Convert a hash value to a string (for printing debug messages).
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:75
static void handle_data(void *cls, const struct DataMessage *dm)
Handle data message we got from the service.
Message to the datastore service asking about zero anonymity content.
Definition: datastore.h:163
#define GNUNET_new(type)
Allocate a struct or union of the given type.
static struct GNUNET_DATASTORE_QueueEntry * get_queue_head(struct GNUNET_DATASTORE_Handle *h, uint16_t response_type)
Get the entry at the head of the message queue.
static void handle_status(void *cls, const struct StatusMessage *sm)
Function called to handle status message from the service.
uint64_t next_uid
UID at which to start the search.
Definition: datastore.h:150
#define GNUNET_MESSAGE_TYPE_DATASTORE_RESERVE
Message sent by datastore client on join.
#define GNUNET_MESSAGE_TYPE_DATASTORE_PUT
Message sent by datastore client to store data.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_release_reserve(struct GNUNET_DATASTORE_Handle *h, uint32_t rid, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Signal that all of the data for which a reservation was made has been stored and that whatever excess...
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
void GNUNET_STATISTICS_destroy(struct GNUNET_STATISTICS_Handle *h, int sync_first)
Destroy a handle (free all state associated with it).
void GNUNET_STATISTICS_update(struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent)
Set statistic value for the peer.
Handle for the service.
const struct GNUNET_CONFIGURATION_Handle * cfg
Our configuration.
static void drop_status_cont(void *cls, int32_t result, struct GNUNET_TIME_Absolute min_expiration, const char *emsg)
Dummy continuation used to do nothing (but be non-zero).
#define GNUNET_break(cond)
Use this for internal assertion violations that are not fatal (can be handled) but should not occur...
static struct GNUNET_ARM_Handle * h
Connection with ARM.
Definition: gnunet-arm.c:94
#define _(String)
GNU gettext support macro.
Definition: platform.h:181
struct StatusContext sc
Definition: datastore_api.c:87
#define GNUNET_MESSAGE_TYPE_DATASTORE_RELEASE_RESERVE
Message sent by datastore client on join.
static struct GNUNET_ARM_MonitorHandle * m
Monitor connection with ARM.
Definition: gnunet-arm.c:99
structs for communication between datastore service and API
GNUNET_DATASTORE_DatumProcessor proc
Function to call with the result.
Definition: datastore_api.c:74
struct ResultContext rc
Definition: datastore_api.c:89
unsigned int max_queue
Maximum allowed length of queue (otherwise this request should be discarded).
#define GNUNET_MQ_msg_extra(mvar, esize, type)
Allocate an envelope, with extra space allocated after the space needed by the message struct...
Definition: gnunet_mq_lib.h:52
struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed(struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls)
Schedule a new task to be run with a specified delay.
Definition: scheduler.c:1237
static void free_queue_entry(struct GNUNET_DATASTORE_QueueEntry *qe)
Free a queue entry.
Entry in our priority queue.
Definition: datastore_api.c:96
#define GNUNET_MESSAGE_TYPE_DATASTORE_DROP
Message sent by datastore client to drop the database.
void * cls
Closure for mv and cb.
void GNUNET_MQ_notify_sent(struct GNUNET_MQ_Envelope *ev, GNUNET_SCHEDULER_TaskCallback cb, void *cb_cls)
Call a callback once the envelope has been sent, that is, sending it can not be canceled anymore...
Definition: mq.c:772
unsigned int skip_next_messages
We should ignore the next message(s) from the service.
#define GNUNET_MQ_hd_var_size(name, code, str, ctx)
Message to the datastore service asking about specific content.
Definition: datastore.h:136
uint32_t type
Desired content type.
Definition: datastore.h:113
struct ListEntry * entries
List of peers in the list.
GNUNET_DATASTORE_ContinuationWithStatus cont
Continuation to call with the status.
Definition: datastore_api.c:58
const char * GNUNET_STRINGS_relative_time_to_string(struct GNUNET_TIME_Relative delta, int do_round)
Give relative time in human-readable fancy format.
Definition: strings.c:686
void(* GNUNET_DATASTORE_DatumProcessor)(void *cls, const struct GNUNET_HashCode *key, size_t size, const void *data, enum GNUNET_BLOCK_Type type, uint32_t priority, uint32_t anonymity, uint32_t replication, struct GNUNET_TIME_Absolute expiration, uint64_t uid)
Process a datum that was stored in the datastore.
uint16_t status
See PRISM_STATUS_*-constants.
static int result
Global testing status.
#define GNUNET_CONTAINER_DLL_insert_after(head, tail, other, element)
Insert an element into a DLL after the given other element.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_remove(struct GNUNET_DATASTORE_Handle *h, const struct GNUNET_HashCode *key, size_t size, const void *data, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_ContinuationWithStatus cont, void *cont_cls)
Explicitly remove some content from the database.
static void disconnect_after_drop(void *cls)
Task used by to disconnect from the datastore after we send the GNUNET_MESSAGE_TYPE_DATASTORE_DROP me...
A 512-bit hashcode.
Message handler for a specific message type.
uint32_t type
Desired content type.
Definition: datastore.h:145
#define LOG(kind,...)
Definition: datastore_api.c:34
#define GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END
Message sent by datastore to client signaling end of matching data.
uint32_t replication
Desired replication level.
Definition: datastore.h:222
static struct GNUNET_DATASTORE_QueueEntry * make_queue_entry(struct GNUNET_DATASTORE_Handle *h, struct GNUNET_MQ_Envelope *env, unsigned int queue_priority, unsigned int max_queue_size, uint16_t expected_type, const union QueueContext *qc)
Create a new entry for our priority queue (and possibly discard other entires if the queue is getting...
void GNUNET_MQ_discard(struct GNUNET_MQ_Envelope *mqm)
Discard the message queue message, free all allocated resources.
Definition: mq.c:319
uint32_t rid
Reservation ID to use; use zero for none.
Definition: datastore.h:197
static struct GNUNET_CONFIGURATION_Handle * cfg
Our configuration.
Definition: gnunet-arm.c:104
uint64_t GNUNET_htonll(uint64_t n)
Convert unsigned 64-bit integer to network byte order.
Definition: common_endian.c:35
union QueueContext qc
Context for the operation.
struct GNUNET_SCHEDULER_Task * delay_warn_task
Task we run if this entry stalls the queue and we need to warn the user.
Message transmitting content from or to the datastore service.
Definition: datastore.h:185
struct GNUNET_HashCode key
Desired key.
Definition: datastore.h:128
struct GNUNET_HashCode key
The key used in the DHT.
#define GNUNET_SYSERR
Definition: gnunet_common.h:76
static unsigned int size
Size of the "table".
Definition: peer.c:66
Context for a queue operation.
Definition: datastore_api.c:86
uint32_t size
Number of bytes in the item (NBO).
Definition: datastore.h:202
uint32_t anonymity
Desired anonymity level (NBO), zero for remove.
Definition: datastore.h:217
void(* GNUNET_DATASTORE_ContinuationWithStatus)(void *cls, int32_t success, struct GNUNET_TIME_Absolute min_expiration, const char *msg)
Continuation called to notify client about result of the operation.
struct GNUNET_MQ_Envelope * env
Definition: 005.c:1
static struct GNUNET_FS_SearchContext * sc
Definition: gnunet-search.c:37
#define GNUNET_TIME_STD_BACKOFF(r)
Perform our standard exponential back-off calculation, starting at 1 ms and then going by a factor of...
int32_t rid
Reservation id.
Definition: datastore.h:96
Context for processing result messages.
Definition: datastore_api.c:70
#define GNUNET_TIME_UNIT_ZERO
Relative time zero.
#define GNUNET_MAX_MESSAGE_SIZE
Largest supported message (to be precise, one byte more than the largest possible message...
struct GNUNET_MessageHeader header
Type is either GNUNET_MESSAGE_TYPE_DATASTORE_PUT, GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE or GNUNET_MESS...
Definition: datastore.h:192
static void disconnect_on_mq_error(void *cls, enum GNUNET_MQ_Error error)
Handle error in sending drop request to datastore.
Handle to a message queue.
Definition: mq.c:84
static int check_data(void *cls, const struct DataMessage *dm)
Check data message we received from the service.
void GNUNET_DATASTORE_cancel(struct GNUNET_DATASTORE_QueueEntry *qe)
Cancel a datastore operation.
uint64_t next_uid
UID at which to start the search.
Definition: datastore.h:177
uint64_t uid
Unique ID for the content (can be used for UPDATE); can be zero for remove (which indicates that the ...
Definition: datastore.h:235
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY
Message sent by datastore client to get random data.
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET_KEY
Message sent by datastore client to get data by key.
configuration data
Definition: configuration.c:83
void GNUNET_DATASTORE_disconnect(struct GNUNET_DATASTORE_Handle *h, int drop)
Disconnect from the datastore service (and free associated resources).
struct GNUNET_HashCode key
Key under which the item can be found.
Definition: datastore.h:245
static void do_disconnect(struct GNUNET_DATASTORE_Handle *h)
Disconnect from the service and then try reconnecting to the datastore service after some delay...
uint32_t priority
Priority of the item (NBO), zero for remove.
Definition: datastore.h:212
Handle to the datastore service.
struct GNUNET_DATASTORE_QueueEntry * queue_head
Current head of priority queue.
static void handle_data_end(void *cls, const struct GNUNET_MessageHeader *msg)
Type of a function to call when we receive a GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END message from the ...
#define GNUNET_log(kind,...)
Entry in list of pending tasks.
Definition: scheduler.c:131
Message from datastore service informing client about the success or failure of a requested operation...
Definition: datastore.h:63
struct GNUNET_TIME_Relative retry_time
How quickly should we retry? Used for exponential back-off on connect-errors.
uint16_t response_type
Expected response type.
struct GNUNET_TIME_Relative GNUNET_TIME_absolute_get_remaining(struct GNUNET_TIME_Absolute future)
Given a timestamp in the future, how much time remains until then?
Definition: time.c:331
enum GNUNET_TESTBED_UnderlayLinkModelType type
the type of this model
void * cont_cls
Closure for cont.
Header for all communications.
Time for absolute times used by GNUnet, in microseconds.
void GNUNET_MQ_destroy(struct GNUNET_MQ_Handle *mq)
Destroy the message queue.
Definition: mq.c:821
#define GNUNET_YES
Definition: gnunet_common.h:77
static unsigned int anonymity
int32_t status
Status code, -1 for errors.
Definition: datastore.h:72
void * cont_cls
Closure for cont.
Definition: datastore_api.c:63
void GNUNET_MQ_send(struct GNUNET_MQ_Handle *mq, struct GNUNET_MQ_Envelope *ev)
Send a message with the given message queue.
Definition: mq.c:351
struct GNUNET_DATASTORE_QueueEntry * next
This is a linked list.
static void mq_error_handler(void *cls, enum GNUNET_MQ_Error error)
Handle error in sending drop request to datastore.
#define GNUNET_MESSAGE_TYPE_DATASTORE_REMOVE
Message sent by datastore client to remove data.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_key(struct GNUNET_DATASTORE_Handle *h, uint64_t next_uid, bool random, const struct GNUNET_HashCode *key, enum GNUNET_BLOCK_Type type, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a result for a particular key from the datastore.
uint64_t next_uid
UID at which to start the search.
Definition: datastore.h:118
#define GNUNET_MESSAGE_TYPE_DATASTORE_GET
Message sent by datastore client to get data.
uint32_t data
The data value.
struct GNUNET_DATASTORE_Handle * GNUNET_DATASTORE_connect(const struct GNUNET_CONFIGURATION_Handle *cfg)
Connect to the datastore service.
struct GNUNET_TIME_AbsoluteNBO GNUNET_TIME_absolute_hton(struct GNUNET_TIME_Absolute a)
Convert absolute time to network byte order.
Definition: time.c:655
unsigned int priority
Priority in the queue.
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_for_replication(struct GNUNET_DATASTORE_Handle *h, unsigned int queue_priority, unsigned int max_queue_size, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a random value from the datastore for content replication.
struct GNUNET_MessageHeader header
Type is GNUNET_MESSAGE_TYPE_DATASTORE_STATUS.
Definition: datastore.h:67
static int check_status(void *cls, const struct StatusMessage *sm)
Function called to check status message from the service.
#define GNUNET_MQ_handler_end()
End-marker for the handlers array.
#define DELAY_WARN_TIMEOUT
Definition: datastore_api.c:36
struct GNUNET_DATASTORE_QueueEntry * GNUNET_DATASTORE_get_zero_anonymity(struct GNUNET_DATASTORE_Handle *h, uint64_t next_uid, unsigned int queue_priority, unsigned int max_queue_size, enum GNUNET_BLOCK_Type type, GNUNET_DATASTORE_DatumProcessor proc, void *proc_cls)
Get a single zero-anonymity value from the datastore.
#define GNUNET_MESSAGE_TYPE_DATASTORE_DATA
Message sent by datastore to client providing requested data (in response to GET or GET_RANDOM reques...
Context for processing status messages.
Definition: datastore_api.c:54
uint64_t GNUNET_ntohll(uint64_t n)
Convert unsigned 64-bit integer to host byte order.
Definition: common_endian.c:48
#define GNUNET_free(ptr)
Wrapper around free.
Time for relative time used by GNUnet, in microseconds.
#define gettext_noop(String)
Definition: gettext.h:69
void * GNUNET_SCHEDULER_cancel(struct GNUNET_SCHEDULER_Task *task)
Cancel the task with the specified identifier.
Definition: scheduler.c:956
uint64_t amount
Number of bytes to reserve.
Definition: datastore.h:53