GNUnet  0.17.6
gnunet-service-tng.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
75 #include "platform.h"
76 #include "gnunet_util_lib.h"
80 #include "gnunet_hello_lib.h"
81 #include "gnunet_signatures.h"
82 #include "transport.h"
83 
87 #define MAX_FC_RETRANSMIT_COUNT 1000
88 
93 #define MAX_CUMMULATIVE_ACKS 64
94 
107 #define FC_NO_CHANGE_REPLY_PROBABILITY 8
108 
113 #define IN_PACKET_SIZE_WITHOUT_MTU 128
114 
119 #define GOODPUT_AGING_SLOTS 4
120 
125 #define DEFAULT_WINDOW_SIZE (128 * 1024)
126 
135 #define MAX_INCOMING_REQUEST 16
136 
141 #define MAX_DV_DISCOVERY_SELECTION 16
142 
151 #define RECV_WINDOW_SIZE 4
152 
160 #define MIN_DV_PATH_LENGTH_FOR_INITIATOR 3
161 
165 #define MAX_DV_HOPS_ALLOWED 16
166 
171 #define MAX_DV_LEARN_PENDING 64
172 
176 #define MAX_DV_PATHS_TO_TARGET 3
177 
183 #define DELAY_WARN_THRESHOLD \
184  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
185 
190 #define DV_FORWARD_TIMEOUT \
191  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
192 
196 #define DEFAULT_ACK_WAIT_DURATION \
197  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
198 
204 #define DV_QUALITY_RTT_THRESHOLD \
205  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
206 
211 #define DV_PATH_VALIDITY_TIMEOUT \
212  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
213 
218 #define BACKCHANNEL_INACTIVITY_TIMEOUT \
219  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
220 
225 #define DV_PATH_DISCOVERY_FREQUENCY \
226  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
227 
231 #define EPHEMERAL_VALIDITY \
232  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
233 
237 #define REASSEMBLY_EXPIRATION \
238  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
239 
244 #define FAST_VALIDATION_CHALLENGE_FREQ \
245  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
246 
250 #define MAX_VALIDATION_CHALLENGE_FREQ \
251  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
252 
258 #define ACK_CUMMULATOR_TIMEOUT \
259  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
260 
265 #define DV_LEARN_BASE_FREQUENCY GNUNET_TIME_UNIT_MINUTES
266 
271 #define DV_LEARN_QUALITY_THRESHOLD 100
272 
276 #define MAX_ADDRESS_VALID_UNTIL \
277  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MONTHS, 1)
278 
282 #define ADDRESS_VALIDATION_LIFETIME \
283  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
284 
291 #define MIN_DELAY_ADDRESS_VALIDATION GNUNET_TIME_UNIT_MILLISECONDS
292 
299 #define VALIDATION_RTT_BUFFER_FACTOR 3
300 
307 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
308 
314 #define QUEUE_LENGTH_LIMIT 32
315 
316 
318 
323 {
328  uint64_t uuid GNUNET_PACKED;
329 };
330 
331 
336 {
340  struct GNUNET_Uuid value;
341 };
342 
347 {
352 
353  /* Followed by *another* message header which is the message to
354  the communicator */
355 
356  /* Followed by a 0-terminated name of the communicator */
357 };
358 
359 
364 {
369 
385 
390 
396 };
397 
398 
404 {
409 
415 
427 
428  /* Followed by a `struct GNUNET_MessageHeader` with a message
429  for the target peer */
430 };
431 
432 
438 {
443 
451 
458 };
459 
460 
465 {
473 
478 };
479 
480 
489 {
494 
500 
501  /* followed by any number of `struct TransportCummulativeAckPayloadP`
502  messages providing ACKs */
503 };
504 
505 
510 {
515 
520 
525 
534 
539  struct MessageUUIDP msg_uuid;
540 };
541 
542 
560 struct DvInitPS
561 {
566 
580 
585 };
586 
587 
604 struct DvHopPS
605 {
610 
614  struct GNUNET_PeerIdentity pred;
615 
619  struct GNUNET_PeerIdentity succ;
620 
625 };
626 
627 
633 {
637  struct GNUNET_PeerIdentity hop;
638 
644 };
645 
646 
661 {
666 
672 
682 
689 
703 
709 
714 
719 
720  /* Followed by @e num_hops `struct DVPathEntryP` values,
721  excluding the initiator of the DV trace; the last entry is the
722  current sender; the current peer must not be included. */
723 };
724 
725 
749 {
754 
758  unsigned int without_fc;
759 
767 
774 
780 
786  struct GNUNET_ShortHashCode iv;
787 
793  struct GNUNET_HashCode hmac;
794 
804 
805  /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
806  excluding the @e origin and the current peer, the last must be
807  the ultimate target; if @e num_hops is zero, the receiver of this
808  message is the ultimate target. */
809 
810  /* Followed by encrypted, variable-size payload, which
811  must begin with a `struct TransportDVBoxPayloadP` */
812 
813  /* Followed by the actual message, which itself must not be a
814  a DV_LEARN or DV_BOX message! */
815 };
816 
817 
823 {
828 
833 
838 
844 };
845 
846 
852 {
857 
863 
868 };
869 
870 
876 {
881 
886 
892 
897 
902  struct GNUNET_TIME_AbsoluteNBO origin_time;
903 
908  struct GNUNET_TIME_RelativeNBO validity_duration;
909 };
910 
911 
921 {
926 
934  uint32_t seq GNUNET_PACKED;
935 
941 
948 
958 
968 };
969 
970 
972 
973 
978 {
982  CT_NONE = 0,
983 
987  CT_CORE = 1,
988 
993 
998 
1002  CT_APPLICATION = 4
1003 };
1004 
1005 
1011 {
1016 
1021 
1026 
1031 
1037  RMO_REDUNDANT = 4
1038 };
1039 
1040 
1045 {
1050 
1055 
1060 
1066 };
1067 
1068 
1074 {
1078  uint64_t bytes_sent;
1079 
1084  uint64_t bytes_received;
1085 };
1086 
1087 
1092 {
1097 
1103 
1108  unsigned int last_age;
1109 };
1110 
1111 
1115 struct TransportClient;
1116 
1120 struct Neighbour;
1121 
1126 struct DistanceVector;
1127 
1132 struct Queue;
1133 
1137 struct PendingMessage;
1138 
1142 struct DistanceVectorHop;
1143 
1152 struct VirtualLink;
1153 
1154 
1160 {
1166 
1172 
1177 
1181  struct GNUNET_TRANSPORT_IncomingMessage im;
1182 
1187  uint16_t total_hops;
1188 };
1189 
1190 
1195 {
1200 
1205 
1209  struct VirtualLink *vl;
1210 
1214  uint16_t size;
1215 
1222  uint16_t isize;
1223 };
1224 
1225 
1230 {
1235  struct MessageUUIDP msg_uuid;
1236 
1241 
1246 
1254  uint8_t *bitfield;
1255 
1260 
1266 
1270  uint16_t msg_size;
1271 
1276  uint16_t msg_missing;
1277 
1278  /* Followed by @e msg_size bytes of the (partially) defragmented original
1279  * message */
1280 
1281  /* Followed by @e bitfield data */
1282 };
1283 
1284 
1294 {
1298  struct GNUNET_PeerIdentity target;
1299 
1306 
1313 
1318 
1324 
1330 
1335 
1340 
1345 
1350 
1358 
1364 
1368  unsigned int fc_retransmit_count;
1369 
1374  unsigned int confirmed;
1375 
1379  struct Neighbour *n;
1380 
1385 
1392 
1399 
1408 
1414 
1420 
1429 
1437 
1444 
1453 
1466 
1472 
1479 
1490 
1495  uint32_t fc_seq_gen;
1496 
1502  uint32_t last_fc_seq;
1503 
1516 };
1517 
1518 
1523 {
1529 
1535 
1542 
1549 
1556 
1563 
1570 
1577 
1582 
1588 
1594 
1599  struct Queue *queue;
1600 
1605 
1609  uint16_t message_size;
1610 };
1611 
1612 
1617 {
1622 
1627 
1632 
1637 
1642 
1647 
1652 
1657 
1663  const struct GNUNET_PeerIdentity *path;
1664 
1670 
1679 
1683  struct PerformanceData pd;
1684 
1690  unsigned int distance;
1691 };
1692 
1693 
1699 {
1703  struct GNUNET_PeerIdentity target;
1704 
1709 
1714 
1719 
1724  struct VirtualLink *vl;
1725 
1731 
1736 
1741 
1746 
1751 };
1752 
1753 
1763 struct QueueEntry
1764 {
1768  struct QueueEntry *next;
1769 
1773  struct QueueEntry *prev;
1774 
1778  struct Queue *queue;
1779 
1784 
1788  uint64_t mid;
1789 };
1790 
1791 
1796 struct Queue
1797 {
1802 
1807 
1812 
1817 
1822 
1827 
1832 
1837 
1842 
1847 
1851  const char *address;
1852 
1856  unsigned int unlimited_length;
1857 
1863 
1872 
1876  struct PerformanceData pd;
1877 
1882  uint64_t mid_gen;
1883 
1887  uint32_t qid;
1888 
1892  uint32_t mtu;
1893 
1898 
1903 
1907  unsigned int queue_length;
1908 
1912  uint64_t q_capacity;
1913 
1917  uint32_t priority;
1918 
1922  enum GNUNET_NetworkType nt;
1923 
1928 
1933  int idle;
1934 };
1935 
1936 
1940 struct Neighbour
1941 {
1945  struct GNUNET_PeerIdentity pid;
1946 
1952 
1958 
1963 
1968 
1974 
1980 
1985  struct VirtualLink *vl;
1986 
1992 
1998 };
1999 
2000 
2006 {
2011 
2016 
2021 
2025  struct GNUNET_PeerIdentity pid;
2026 };
2027 
2028 
2032 struct PeerRequest
2033 {
2037  struct GNUNET_PeerIdentity pid;
2038 
2043 
2048 
2055 
2060 };
2061 
2062 
2067 {
2072 
2077 
2082 
2086  PMT_DV_BOX = 3
2087 };
2088 
2089 
2116 struct PendingMessage
2117 {
2122 
2127 
2132 
2137 
2143 
2149 
2154 
2159 
2165 
2169  struct VirtualLink *vl;
2170 
2179  struct QueueEntry *qe;
2180 
2185 
2190 
2195 
2200 
2205 
2210 
2215  struct MessageUUIDP msg_uuid;
2216 
2221  unsigned long long logging_uuid;
2222 
2226  enum PendingMessageType pmt;
2227 
2233 
2238 
2242  uint16_t bytes_msg;
2243 
2247  uint16_t frag_off;
2248 
2252  int16_t msg_uuid_set;
2253 
2254  /* Followed by @e bytes_msg to transmit */
2255 };
2256 
2257 
2262 {
2268 
2273 };
2274 
2275 
2281 {
2285  struct GNUNET_PeerIdentity target;
2286 
2291 
2298 
2303 
2309  uint32_t ack_counter;
2310 
2314  unsigned int num_acks;
2315 };
2316 
2317 
2322 {
2327 
2332 
2337 
2341  const char *address;
2342 
2347 
2352 
2358 
2362  uint32_t aid;
2363 
2367  enum GNUNET_NetworkType nt;
2368 };
2369 
2370 
2375 {
2380 
2385 
2390 
2395 
2399  enum ClientType type;
2400 
2401  union
2402  {
2406  struct
2407  {
2413 
2418  } core;
2419 
2423  struct
2424  {
2430  struct GNUNET_PeerIdentity peer;
2431 
2437 
2438 
2442  struct
2443  {
2449 
2454 
2459 
2465 
2471 
2477  unsigned int total_queue_length;
2478 
2484 
2488  struct
2489  {
2497 };
2498 
2499 
2505 {
2510  struct GNUNET_PeerIdentity pid;
2511 
2519 
2525 
2532  struct GNUNET_TIME_Absolute first_challenge_use;
2533 
2540  struct GNUNET_TIME_Absolute last_challenge_use;
2541 
2549  struct GNUNET_TIME_Absolute next_challenge;
2550 
2559  struct GNUNET_TIME_Relative challenge_backoff;
2560 
2565  struct GNUNET_TIME_Relative validation_rtt;
2566 
2574  struct GNUNET_CRYPTO_ChallengeNonceP challenge;
2575 
2579  char *address;
2580 
2586  struct GNUNET_CONTAINER_HeapNode *hn;
2587 
2593 
2599  uint32_t last_window_consum_limit;
2600 
2605  int awaiting_queue;
2606 };
2607 
2608 
2616 {
2620  struct GNUNET_PeerIdentity pid;
2621 
2626 
2631 
2636 
2642 
2647 
2653 
2659 
2664  size_t body_size;
2665 };
2666 
2667 
2672 
2677 
2682 
2687 
2691 static struct GNUNET_PeerIdentity GST_my_identity;
2692 
2697 
2703 
2709 
2715 
2721 
2727 
2733 
2739 
2744 
2748 static struct LearnLaunchEntry *lle_head = NULL;
2749 
2753 static struct LearnLaunchEntry *lle_tail = NULL;
2754 
2761 
2766 
2771 
2776 
2782 
2788 
2794 static struct IncomingRequest *ir_head;
2795 
2799 static struct IncomingRequest *ir_tail;
2800 
2804 static unsigned int ir_total;
2805 
2809 static unsigned long long logging_uuid_gen;
2810 
2815 static unsigned int pa_count;
2816 
2826 
2831 static int in_shutdown;
2832 
2843 static unsigned int
2845 {
2846  struct GNUNET_TIME_Absolute now;
2847 
2848  now = GNUNET_TIME_absolute_get ();
2849  return now.abs_value_us / GNUNET_TIME_UNIT_MINUTES.rel_value_us / 15;
2850 }
2851 
2852 
2858 static void
2860 {
2862  GNUNET_assert (ir_total > 0);
2863  ir_total--;
2865  ir->wc = NULL;
2866  GNUNET_free (ir);
2867 }
2868 
2869 
2875 static void
2877 {
2878  struct Queue *q = pa->queue;
2879  struct PendingMessage *pm = pa->pm;
2880  struct DistanceVectorHop *dvh = pa->dvh;
2881 
2883  "free_pending_acknowledgement\n");
2884  if (NULL != q)
2885  {
2886  GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
2887  pa->queue = NULL;
2888  }
2889  if (NULL != pm)
2890  {
2892  "remove pa from message\n");
2894  "remove pa from message %llu\n",
2895  pm->logging_uuid);
2897  "remove pa from message %u\n",
2898  pm->pmt);
2900  "remove pa from message %s\n",
2901  GNUNET_uuid2s (&pa->ack_uuid.value));
2902  GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
2903  pa->pm = NULL;
2904  }
2905  if (NULL != dvh)
2906  {
2907  GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
2908  pa->queue = NULL;
2909  }
2912  &pa->ack_uuid.value,
2913  pa));
2914  GNUNET_free (pa);
2915 }
2916 
2917 
2926 static void
2928 {
2929  struct PendingMessage *frag;
2930 
2931  while (NULL != (frag = root->head_frag))
2932  {
2933  struct PendingAcknowledgement *pa;
2934 
2935  free_fragment_tree (frag);
2936  while (NULL != (pa = frag->pa_head))
2937  {
2938  GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
2939  pa->pm = NULL;
2940  }
2941  GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
2942  if (NULL != frag->qe)
2943  {
2944  GNUNET_assert (frag == frag->qe->pm);
2945  frag->qe->pm = NULL;
2947  frag->qe->queue->queue_tail,
2948  frag->qe);
2950  "Removing QueueEntry MID %llu from queue\n",
2951  frag->qe->mid);
2952  GNUNET_free (frag->qe);
2953  }
2955  "Free frag %p\n",
2956  frag);
2957  GNUNET_free (frag);
2958  }
2959 }
2960 
2961 
2969 static void
2971 {
2972  struct TransportClient *tc = pm->client;
2973  struct VirtualLink *vl = pm->vl;
2974  struct PendingAcknowledgement *pa;
2975 
2977  "Freeing pm %p\n",
2978  pm);
2979  if (NULL != tc)
2980  {
2982  tc->details.core.pending_msg_head,
2983  tc->details.core.pending_msg_tail,
2984  pm);
2985  }
2986  if ((NULL != vl) && (NULL == pm->frag_parent))
2987  {
2989  "Removing pm %lu\n",
2990  pm->logging_uuid);
2992  vl->pending_msg_head,
2993  vl->pending_msg_tail,
2994  pm);
2995  }
2996  while (NULL != (pa = pm->pa_head))
2997  {
2998  if (NULL == pa)
3000  "free pending pa null\n");
3001  if (NULL == pm->pa_tail)
3003  "free pending pa_tail null\n");
3004  if (NULL == pa->prev_pa)
3006  "free pending pa prev null\n");
3007  if (NULL == pa->next_pa)
3009  "free pending pa next null\n");
3010  GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
3011  pa->pm = NULL;
3012  }
3013 
3015  if (NULL != pm->qe)
3016  {
3017  GNUNET_assert (pm == pm->qe->pm);
3018  pm->qe->pm = NULL;
3019  GNUNET_CONTAINER_DLL_remove (pm->qe->queue->queue_head,
3020  pm->qe->queue->queue_tail,
3021  pm->qe);
3023  "Removing QueueEntry MID %llu from queue\n",
3024  pm->qe->mid);
3025  GNUNET_free (pm->qe);
3026  }
3027  if (NULL != pm->bpm)
3028  {
3029  free_fragment_tree (pm->bpm);
3030  GNUNET_free (pm->bpm);
3031  }
3032  if (NULL == pm)
3034  "free pending pm null\n");
3035  GNUNET_free (pm);
3036 }
3037 
3038 
3044 static void
3046 {
3047  struct VirtualLink *vl = rc->virtual_link;
3048 
3052  rc->msg_uuid.uuid,
3053  rc));
3054  GNUNET_free (rc);
3055 }
3056 
3057 
3063 static void
3065 {
3066  struct VirtualLink *vl = cls;
3067  struct ReassemblyContext *rc;
3068 
3069  vl->reassembly_timeout_task = NULL;
3070  while (NULL != (rc = GNUNET_CONTAINER_heap_peek (vl->reassembly_heap)))
3071  {
3073  .rel_value_us)
3074  {
3076  continue;
3077  }
3078  GNUNET_assert (NULL == vl->reassembly_timeout_task);
3082  vl);
3083  return;
3084  }
3085 }
3086 
3087 
3096 static int
3097 free_reassembly_cb (void *cls, uint32_t key, void *value)
3098 {
3099  struct ReassemblyContext *rc = value;
3100 
3101  (void) cls;
3102  (void) key;
3104  return GNUNET_OK;
3105 }
3106 
3107 
3113 static void
3115 {
3116  struct PendingMessage *pm;
3117  struct CoreSentContext *csc;
3118 
3120  "free virtual link %p\n",
3121  vl);
3122 
3123  if (NULL != vl->reassembly_map)
3124  {
3127  NULL);
3129  vl->reassembly_map = NULL;
3131  vl->reassembly_heap = NULL;
3132  }
3133  if (NULL != vl->reassembly_timeout_task)
3134  {
3136  vl->reassembly_timeout_task = NULL;
3137  }
3138  while (NULL != (pm = vl->pending_msg_head))
3142  if (NULL != vl->visibility_task)
3143  {
3145  vl->visibility_task = NULL;
3146  }
3147  if (NULL != vl->fc_retransmit_task)
3148  {
3150  vl->fc_retransmit_task = NULL;
3151  }
3152  while (NULL != (csc = vl->csc_head))
3153  {
3155  GNUNET_assert (vl == csc->vl);
3156  csc->vl = NULL;
3157  }
3158  GNUNET_break (NULL == vl->n);
3159  GNUNET_break (NULL == vl->dv);
3160  GNUNET_free (vl);
3161 }
3162 
3163 
3169 static void
3171 {
3172  GNUNET_assert (
3173  GNUNET_YES ==
3176  vs->hn = NULL;
3177  if (NULL != vs->sc)
3178  {
3180  "store cancel\n");
3182  vs->sc = NULL;
3183  }
3184  GNUNET_free (vs->address);
3185  GNUNET_free (vs);
3186 }
3187 
3188 
3195 static struct Neighbour *
3197 {
3199 }
3200 
3201 
3208 static struct VirtualLink *
3210 {
3212 }
3213 
3214 
3219 {
3226 
3230  struct GNUNET_TIME_Relative rtt;
3231 
3236 
3241 
3246 };
3247 
3248 
3257 static void
3259 {
3260  struct Neighbour *n = dvh->next_hop;
3261  struct DistanceVector *dv = dvh->dv;
3262  struct PendingAcknowledgement *pa;
3263 
3264  while (NULL != (pa = dvh->pa_head))
3265  {
3267  pa->dvh = NULL;
3268  }
3269  GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh);
3271  GNUNET_free (dvh);
3272 }
3273 
3274 
3281 static void
3282 check_link_down (void *cls);
3283 
3284 
3290 static void
3292 {
3294  "Informing CORE clients about disconnect from %s\n",
3295  GNUNET_i2s (pid));
3296  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3297  {
3298  struct GNUNET_MQ_Envelope *env;
3299  struct DisconnectInfoMessage *dim;
3300 
3301  if (CT_CORE != tc->type)
3302  continue;
3304  dim->peer = *pid;
3305  GNUNET_MQ_send (tc->mq, env);
3306  }
3307 }
3308 
3309 
3316 static void
3318 {
3319  struct DistanceVectorHop *dvh;
3320 
3321  while (NULL != (dvh = dv->dv_head))
3323  if (NULL == dv->dv_head)
3324  {
3325  struct VirtualLink *vl;
3326 
3327  GNUNET_assert (
3328  GNUNET_YES ==
3330  if (NULL != (vl = dv->vl))
3331  {
3332  GNUNET_assert (dv == vl->dv);
3333  vl->dv = NULL;
3334  if (NULL == vl->n)
3335  {
3337  free_virtual_link (vl);
3338  }
3339  else
3340  {
3343  }
3344  dv->vl = NULL;
3345  }
3346 
3347  if (NULL != dv->timeout_task)
3348  {
3350  dv->timeout_task = NULL;
3351  }
3352  GNUNET_free (dv);
3353  }
3354 }
3355 
3356 
3370 static void
3372  const struct GNUNET_PeerIdentity *peer,
3373  const char *address,
3374  enum GNUNET_NetworkType nt,
3375  const struct MonitorEvent *me)
3376 {
3377  struct GNUNET_MQ_Envelope *env;
3378  struct GNUNET_TRANSPORT_MonitorData *md;
3379  size_t addr_len = strlen (address) + 1;
3380 
3381  env = GNUNET_MQ_msg_extra (md,
3382  addr_len,
3384  md->nt = htonl ((uint32_t) nt);
3385  md->peer = *peer;
3386  md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
3387  md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
3388  md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
3389  md->rtt = GNUNET_TIME_relative_hton (me->rtt);
3390  md->cs = htonl ((uint32_t) me->cs);
3391  md->num_msg_pending = htonl (me->num_msg_pending);
3392  md->num_bytes_pending = htonl (me->num_bytes_pending);
3393  memcpy (&md[1], address, addr_len);
3394  GNUNET_MQ_send (tc->mq, env);
3395 }
3396 
3397 
3407 static void
3409  const char *address,
3410  enum GNUNET_NetworkType nt,
3411  const struct MonitorEvent *me)
3412 {
3413  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3414  {
3415  if (CT_MONITOR != tc->type)
3416  continue;
3417  if (tc->details.monitor.one_shot)
3418  continue;
3419  if ((GNUNET_NO == GNUNET_is_zero (&tc->details.monitor.peer)) &&
3420  (0 != GNUNET_memcmp (&tc->details.monitor.peer, peer)))
3421  continue;
3423  }
3424 }
3425 
3426 
3436 static void *
3438  struct GNUNET_SERVICE_Client *client,
3439  struct GNUNET_MQ_Handle *mq)
3440 {
3441  struct TransportClient *tc;
3442 
3443  (void) cls;
3444  tc = GNUNET_new (struct TransportClient);
3445  tc->client = client;
3446  tc->mq = mq;
3449  "Client %p of type %u connected\n",
3450  tc,
3451  tc->type);
3452  return tc;
3453 }
3454 
3455 
3461 static void
3462 free_neighbour (struct Neighbour *neighbour)
3463 {
3464  struct DistanceVectorHop *dvh;
3465  struct VirtualLink *vl;
3466 
3467  GNUNET_assert (NULL == neighbour->queue_head);
3470  &neighbour->pid,
3471  neighbour));
3473  "Freeing neighbour\n");
3474  while (NULL != (dvh = neighbour->dv_head))
3475  {
3476  struct DistanceVector *dv = dvh->dv;
3477 
3479  if (NULL == dv->dv_head)
3480  free_dv_route (dv);
3481  }
3482  if (NULL != neighbour->get)
3483  {
3484  GNUNET_PEERSTORE_iterate_cancel (neighbour->get);
3485  neighbour->get = NULL;
3486  }
3487  if (NULL != neighbour->sc)
3488  {
3490  "store cancel\n");
3491  GNUNET_PEERSTORE_store_cancel (neighbour->sc);
3492  neighbour->sc = NULL;
3493  }
3494  if (NULL != (vl = neighbour->vl))
3495  {
3496  GNUNET_assert (neighbour == vl->n);
3497  vl->n = NULL;
3498  if (NULL == vl->dv)
3499  {
3502  }
3503  else
3504  {
3507  }
3508  neighbour->vl = NULL;
3509  }
3510  GNUNET_free (neighbour);
3511 }
3512 
3513 
3520 static void
3522  const struct GNUNET_PeerIdentity *pid)
3523 {
3524  struct GNUNET_MQ_Envelope *env;
3525  struct ConnectInfoMessage *cim;
3526 
3527  GNUNET_assert (CT_CORE == tc->type);
3529  cim->id = *pid;
3530  GNUNET_MQ_send (tc->mq, env);
3531 }
3532 
3533 
3539 static void
3541 {
3543  "Informing CORE clients about connection to %s\n",
3544  GNUNET_i2s (pid));
3545  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3546  {
3547  if (CT_CORE != tc->type)
3548  continue;
3550  }
3551 }
3552 
3553 
3561 static void
3562 transmit_on_queue (void *cls);
3563 
3564 
3568 static unsigned int
3570 {
3571  for (struct Queue *s = queue_head; NULL != s;
3572  s = s->next_client)
3573  {
3574  if (s->tc->details.communicator.address_prefix !=
3575  queue->tc->details.communicator.address_prefix)
3576  {
3578  "queue address %s qid %u compare with queue: address %s qid %u\n",
3579  queue->address,
3580  queue->qid,
3581  s->address,
3582  s->qid);
3583  if ((s->priority > queue->priority) && (0 < s->q_capacity) &&
3584  (QUEUE_LENGTH_LIMIT > s->queue_length) )
3585  return GNUNET_YES;
3587  "Lower prio\n");
3588  }
3589  }
3590  return GNUNET_NO;
3591 }
3592 
3593 
3601 static void
3603  struct Queue *queue,
3605 {
3607  queue->tc->details.communicator.
3608  queue_head))
3609  return;
3610 
3611  if (queue->tc->details.communicator.total_queue_length >=
3613  {
3615  "Transmission throttled due to communicator queue limit\n");
3617  GST_stats,
3618  "# Transmission throttled due to communicator queue limit",
3619  1,
3620  GNUNET_NO);
3621  queue->idle = GNUNET_NO;
3622  return;
3623  }
3624  if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
3625  {
3627  "Transmission throttled due to communicator queue length limit\n");
3629  "# Transmission throttled due to queue queue limit",
3630  1,
3631  GNUNET_NO);
3632  queue->idle = GNUNET_NO;
3633  return;
3634  }
3635  if (0 == queue->q_capacity)
3636  {
3638  "Transmission throttled due to communicator message queue qid %u has capacity %lu.\n",
3639  queue->qid,
3640  queue->q_capacity);
3642  "# Transmission throttled due to message queue capacity",
3643  1,
3644  GNUNET_NO);
3645  queue->idle = GNUNET_NO;
3646  return;
3647  }
3648  /* queue might indeed be ready, schedule it */
3649  if (NULL != queue->transmit_task)
3650  GNUNET_SCHEDULER_cancel (queue->transmit_task);
3651  queue->transmit_task =
3653  queue);
3655  "Considering transmission on queue `%s' QID %llu to %s\n",
3656  queue->address,
3657  (unsigned long long) queue->qid,
3658  GNUNET_i2s (&queue->neighbour->pid));
3659 }
3660 
3661 
3668 static void
3669 check_link_down (void *cls)
3670 {
3671  struct VirtualLink *vl = cls;
3672  struct DistanceVector *dv = vl->dv;
3673  struct Neighbour *n = vl->n;
3674  struct GNUNET_TIME_Absolute dvh_timeout;
3675  struct GNUNET_TIME_Absolute q_timeout;
3676 
3678  "Checking if link is down\n");
3679  vl->visibility_task = NULL;
3680  dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3681  if (NULL != dv)
3682  {
3683  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
3684  pos = pos->next_dv)
3685  dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout,
3686  pos->path_valid_until);
3687  if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
3688  {
3689  vl->dv->vl = NULL;
3690  vl->dv = NULL;
3691  }
3692  }
3693  q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3694  for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
3695  q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
3696  if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
3697  {
3698  vl->n->vl = NULL;
3699  vl->n = NULL;
3700  }
3701  if ((NULL == vl->n) && (NULL == vl->dv))
3702  {
3704  free_virtual_link (vl);
3705  return;
3706  }
3707  vl->visibility_task =
3708  GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout),
3709  &check_link_down,
3710  vl);
3711 }
3712 
3713 
3719 static void
3721 {
3722  struct Neighbour *neighbour = queue->neighbour;
3723  struct TransportClient *tc = queue->tc;
3724  struct MonitorEvent me = { .cs = GNUNET_TRANSPORT_CS_DOWN,
3726  struct QueueEntry *qe;
3727  int maxxed;
3728  struct PendingAcknowledgement *pa;
3729  struct VirtualLink *vl;
3730 
3732  "Cleaning up queue %u\n", queue->qid);
3733  if (NULL != queue->transmit_task)
3734  {
3735  GNUNET_SCHEDULER_cancel (queue->transmit_task);
3736  queue->transmit_task = NULL;
3737  }
3738  while (NULL != (pa = queue->pa_head))
3739  {
3740  GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
3741  pa->queue = NULL;
3742  }
3743 
3744  GNUNET_CONTAINER_MDLL_remove (neighbour,
3745  neighbour->queue_head,
3746  neighbour->queue_tail,
3747  queue);
3749  tc->details.communicator.queue_head,
3750  tc->details.communicator.queue_tail,
3751  queue);
3752  maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT <=
3753  tc->details.communicator.
3754  total_queue_length);
3755  while (NULL != (qe = queue->queue_head))
3756  {
3757  GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe);
3758  queue->queue_length--;
3759  tc->details.communicator.total_queue_length--;
3760  if (NULL != qe->pm)
3761  {
3762  GNUNET_assert (qe == qe->pm->qe);
3763  qe->pm->qe = NULL;
3764  }
3765  GNUNET_free (qe);
3766  }
3767  GNUNET_assert (0 == queue->queue_length);
3768  if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT >
3769  tc->details.communicator.total_queue_length))
3770  {
3771  /* Communicator dropped below threshold, resume all _other_ queues */
3773  GST_stats,
3774  "# Transmission throttled due to communicator queue limit",
3775  -1,
3776  GNUNET_NO);
3777  for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
3778  s = s->next_client)
3780  s,
3782  }
3783  notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
3784  GNUNET_free (queue);
3785 
3786  vl = lookup_virtual_link (&neighbour->pid);
3787  if ((NULL != vl) && (GNUNET_YES == vl->confirmed) && (neighbour == vl->n))
3788  {
3790  check_link_down (vl);
3791  }
3792  if (NULL == neighbour->queue_head)
3793  {
3794  free_neighbour (neighbour);
3795  }
3796 }
3797 
3798 
3804 static void
3806 {
3807  struct TransportClient *tc = ale->tc;
3808 
3809  GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
3810  tc->details.communicator.addr_tail,
3811  ale);
3812  if (NULL != ale->sc)
3813  {
3815  "store cancel\n");
3817  ale->sc = NULL;
3818  }
3819  if (NULL != ale->st)
3820  {
3821  GNUNET_SCHEDULER_cancel (ale->st);
3822  ale->st = NULL;
3823  }
3824  GNUNET_free (ale);
3825 }
3826 
3827 
3836 static int
3838  const struct GNUNET_PeerIdentity *pid,
3839  void *value)
3840 {
3841  struct TransportClient *tc = cls;
3842  struct PeerRequest *pr = value;
3843 
3845  pr->wc = NULL;
3846  GNUNET_assert (
3847  GNUNET_YES ==
3848  GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
3849  pid,
3850  pr));
3851  GNUNET_free (pr);
3852 
3853  return GNUNET_OK;
3854 }
3855 
3856 
3857 static void
3858 do_shutdown (void *cls);
3859 
3868 static void
3870  struct GNUNET_SERVICE_Client *client,
3871  void *app_ctx)
3872 {
3873  struct TransportClient *tc = app_ctx;
3874 
3875  (void) cls;
3876  (void) client;
3878  switch (tc->type)
3879  {
3880  case CT_NONE:
3882  "Unknown Client %p disconnected, cleaning up.\n",
3883  tc);
3884  break;
3885 
3886  case CT_CORE: {
3888  "CORE Client %p disconnected, cleaning up.\n",
3889  tc);
3890 
3891  struct PendingMessage *pm;
3892 
3893  while (NULL != (pm = tc->details.core.pending_msg_head))
3894  {
3896  tc->details.core.pending_msg_head,
3897  tc->details.core.pending_msg_tail,
3898  pm);
3899  pm->client = NULL;
3900  }
3901  }
3902  break;
3903 
3904  case CT_MONITOR:
3906  "MONITOR Client %p disconnected, cleaning up.\n",
3907  tc);
3908 
3909  break;
3910 
3911  case CT_COMMUNICATOR: {
3913  "COMMUNICATOR Client %p disconnected, cleaning up.\n",
3914  tc);
3915 
3916  struct Queue *q;
3917  struct AddressListEntry *ale;
3918 
3919  while (NULL != (q = tc->details.communicator.queue_head))
3920  free_queue (q);
3921  while (NULL != (ale = tc->details.communicator.addr_head))
3923  GNUNET_free (tc->details.communicator.address_prefix);
3924  }
3925  break;
3926 
3927  case CT_APPLICATION:
3929  "APPLICATION Client %p disconnected, cleaning up.\n",
3930  tc);
3931 
3932  GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
3934  tc);
3935  GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests);
3936  break;
3937  }
3938  GNUNET_free (tc);
3939  if ((GNUNET_YES == in_shutdown) && (NULL == clients_head))
3940  {
3942  "Our last client disconnected\n");
3943  do_shutdown (cls);
3944  }
3945 }
3946 
3947 
3957 static int
3959  const struct GNUNET_PeerIdentity *pid,
3960  void *value)
3961 {
3962  struct TransportClient *tc = cls;
3963 
3964  (void) value;
3966  "Telling new CORE client about existing connection to %s\n",
3967  GNUNET_i2s (pid));
3969  return GNUNET_OK;
3970 }
3971 
3972 
3981 static void
3982 handle_client_start (void *cls, const struct StartMessage *start)
3983 {
3984  struct TransportClient *tc = cls;
3985  uint32_t options;
3986 
3987  options = ntohl (start->options);
3988  if ((0 != (1 & options)) &&
3989  (0 != GNUNET_memcmp (&start->self, &GST_my_identity)))
3990  {
3991  /* client thinks this is a different peer, reject */
3992  GNUNET_break (0);
3993  GNUNET_SERVICE_client_drop (tc->client);
3994  return;
3995  }
3996  if (CT_NONE != tc->type)
3997  {
3998  GNUNET_break (0);
3999  GNUNET_SERVICE_client_drop (tc->client);
4000  return;
4001  }
4002  tc->type = CT_CORE;
4004  "New CORE client with PID %s registered\n",
4005  GNUNET_i2s (&start->self));
4008  tc);
4010 }
4011 
4012 
4019 static int
4020 check_client_send (void *cls, const struct OutboundMessage *obm)
4021 {
4022  struct TransportClient *tc = cls;
4023  uint16_t size;
4024  const struct GNUNET_MessageHeader *obmm;
4025 
4026  if (CT_CORE != tc->type)
4027  {
4028  GNUNET_break (0);
4029  return GNUNET_SYSERR;
4030  }
4031  size = ntohs (obm->header.size) - sizeof(struct OutboundMessage);
4032  if (size < sizeof(struct GNUNET_MessageHeader))
4033  {
4034  GNUNET_break (0);
4035  return GNUNET_SYSERR;
4036  }
4037  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
4038  if (size != ntohs (obmm->size))
4039  {
4040  GNUNET_break (0);
4041  return GNUNET_SYSERR;
4042  }
4043  return GNUNET_OK;
4044 }
4045 
4046 
4054 static void
4056 {
4057  struct TransportClient *tc = pm->client;
4058  struct VirtualLink *vl = pm->vl;
4059 
4061  "client send response\n");
4062  if (NULL != tc)
4063  {
4064  struct GNUNET_MQ_Envelope *env;
4065  struct SendOkMessage *so_msg;
4066 
4068  so_msg->peer = vl->target;
4070  "Confirming transmission of <%llu> to %s\n",
4071  pm->logging_uuid,
4072  GNUNET_i2s (&vl->target));
4073  GNUNET_MQ_send (tc->mq, env);
4074  }
4076 }
4077 
4078 
4088 static unsigned int
4091  struct DistanceVectorHop **hops_array,
4092  unsigned int hops_array_length)
4093 {
4094  uint64_t choices[hops_array_length];
4095  uint64_t num_dv;
4096  unsigned int dv_count;
4097 
4098  /* Pick random vectors, but weighted by distance, giving more weight
4099  to shorter vectors */
4100  num_dv = 0;
4101  dv_count = 0;
4102  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4103  pos = pos->next_dv)
4104  {
4105  if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
4106  (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
4107  .rel_value_us == 0))
4108  continue; /* pos unconfirmed and confirmed required */
4109  num_dv += MAX_DV_HOPS_ALLOWED - pos->distance;
4110  dv_count++;
4111  }
4112  if (0 == dv_count)
4113  return 0;
4114  if (dv_count <= hops_array_length)
4115  {
4116  dv_count = 0;
4117  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4118  pos = pos->next_dv)
4119  hops_array[dv_count++] = pos;
4120  return dv_count;
4121  }
4122  for (unsigned int i = 0; i < hops_array_length; i++)
4123  {
4124  int ok = GNUNET_NO;
4125  while (GNUNET_NO == ok)
4126  {
4127  choices[i] =
4129  ok = GNUNET_YES;
4130  for (unsigned int j = 0; j < i; j++)
4131  if (choices[i] == choices[j])
4132  {
4133  ok = GNUNET_NO;
4134  break;
4135  }
4136  }
4137  }
4138  dv_count = 0;
4139  num_dv = 0;
4140  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4141  pos = pos->next_dv)
4142  {
4143  uint32_t delta = MAX_DV_HOPS_ALLOWED - pos->distance;
4144 
4145  if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
4146  (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
4147  .rel_value_us == 0))
4148  continue; /* pos unconfirmed and confirmed required */
4149  for (unsigned int i = 0; i < hops_array_length; i++)
4150  if ((num_dv <= choices[i]) && (num_dv + delta > choices[i]))
4151  hops_array[dv_count++] = pos;
4152  num_dv += delta;
4153  }
4154  return dv_count;
4155 }
4156 
4157 
4164 static int
4166  void *cls,
4167  const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
4168 {
4169  struct TransportClient *tc = cls;
4170  uint16_t size;
4171 
4172  if (CT_NONE != tc->type)
4173  {
4174  GNUNET_break (0);
4175  return GNUNET_SYSERR;
4176  }
4177  tc->type = CT_COMMUNICATOR;
4178  size = ntohs (cam->header.size) - sizeof(*cam);
4179  if (0 == size)
4180  return GNUNET_OK; /* receive-only communicator */
4182  return GNUNET_OK;
4183 }
4184 
4185 
4191 static void
4193 {
4194  if (0 != ntohl (cmc->im.fc_on))
4195  {
4196  /* send ACK when done to communicator for flow control! */
4197  struct GNUNET_MQ_Envelope *env;
4198  struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
4199 
4201  ack->reserved = htonl (0);
4202  ack->fc_id = cmc->im.fc_id;
4203  ack->sender = cmc->im.sender;
4204  GNUNET_MQ_send (cmc->tc->mq, env);
4205  }
4207  GNUNET_free (cmc);
4208 }
4209 
4210 
4220 static void
4221 handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
4222 {
4223  struct TransportClient *tc = cls;
4224  struct VirtualLink *vl;
4225  uint32_t delta;
4226  struct CommunicatorMessageContext *cmc;
4227 
4228  if (CT_CORE != tc->type)
4229  {
4230  GNUNET_break (0);
4231  GNUNET_SERVICE_client_drop (tc->client);
4232  return;
4233  }
4234  vl = lookup_virtual_link (&rom->peer);
4235  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
4236  {
4238  "# RECV_OK dropped: virtual link unknown",
4239  1,
4240  GNUNET_NO);
4242  return;
4243  }
4244  delta = ntohl (rom->increase_window_delta);
4245  vl->core_recv_window += delta;
4246  if (vl->core_recv_window <= 0)
4247  return;
4248  /* resume communicators */
4249  while (NULL != (cmc = vl->cmc_tail))
4250  {
4252  finish_cmc_handling (cmc);
4253  }
4254 }
4255 
4256 
4263 static void
4265  void *cls,
4266  const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
4267 {
4268  struct TransportClient *tc = cls;
4269  uint16_t size;
4270 
4271  size = ntohs (cam->header.size) - sizeof(*cam);
4272  if (0 == size)
4273  {
4275  "Receive-only communicator connected\n");
4276  return; /* receive-only communicator */
4277  }
4278  tc->details.communicator.address_prefix =
4279  GNUNET_strdup ((const char *) &cam[1]);
4280  tc->details.communicator.cc =
4281  (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
4283  "Communicator with prefix `%s' connected\n",
4284  tc->details.communicator.address_prefix);
4286 }
4287 
4288 
4296 static int
4298  void *cls,
4299  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
4300 {
4301  const struct GNUNET_MessageHeader *inbox;
4302  const char *is;
4303  uint16_t msize;
4304  uint16_t isize;
4305 
4306  (void) cls;
4307  msize = ntohs (cb->header.size) - sizeof(*cb);
4308  inbox = (const struct GNUNET_MessageHeader *) &cb[1];
4309  isize = ntohs (inbox->size);
4310  if (isize >= msize)
4311  {
4312  GNUNET_break (0);
4313  return GNUNET_SYSERR;
4314  }
4315  is = (const char *) inbox;
4316  is += isize;
4317  msize -= isize;
4318  GNUNET_assert (0 < msize);
4319  if ('\0' != is[msize - 1])
4320  {
4321  GNUNET_break (0);
4322  return GNUNET_SYSERR;
4323  }
4324  return GNUNET_OK;
4325 }
4326 
4327 
4334 static void
4336 {
4337  struct EphemeralConfirmationPS ec;
4338 
4339  if (0 !=
4341  return;
4343  dv->ephemeral_validity =
4348  ec.target = dv->target;
4349  ec.ephemeral_key = dv->ephemeral_key;
4351  ec.purpose.size = htonl (sizeof(ec));
4353  &ec,
4354  &dv->sender_sig);
4355 }
4356 
4357 
4367 static void
4369  struct PendingMessage *pm,
4370  const void *payload,
4371  size_t payload_size)
4372 {
4373  struct Neighbour *n = queue->neighbour;
4374  struct GNUNET_TRANSPORT_SendMessageTo *smt;
4375  struct GNUNET_MQ_Envelope *env;
4376 
4377  GNUNET_log (
4379  "Queueing %u bytes of payload for transmission <%llu> on queue %llu to %s\n",
4380  (unsigned int) payload_size,
4381  (NULL == pm) ? 0 : pm->logging_uuid,
4382  (unsigned long long) queue->qid,
4383  GNUNET_i2s (&queue->neighbour->pid));
4384  env = GNUNET_MQ_msg_extra (smt,
4385  payload_size,
4387  smt->qid = queue->qid;
4388  smt->mid = queue->mid_gen;
4389  smt->receiver = n->pid;
4390  memcpy (&smt[1], payload, payload_size);
4391  {
4392  /* Pass the env to the communicator of queue for transmission. */
4393  struct QueueEntry *qe;
4394 
4395  qe = GNUNET_new (struct QueueEntry);
4396  qe->mid = queue->mid_gen++;
4397  qe->queue = queue;
4398  if (NULL != pm)
4399  {
4400  qe->pm = pm;
4401  // TODO Why do we have a retransmission. When we know, make decision if we still want this.
4402  // GNUNET_assert (NULL == pm->qe);
4403  /*if (NULL != pm->qe)
4404  {
4405  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4406  "Retransmitting message <%llu> remove pm from qe with MID: %llu \n",
4407  pm->logging_uuid,
4408  (unsigned long long) pm->qe->mid);
4409  pm->qe->pm = NULL;
4410  }*/
4411  pm->qe = qe;
4412  }
4413  GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe);
4414  GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
4415  queue->queue_length++;
4416  queue->tc->details.communicator.total_queue_length++;
4417  if (0 == queue->q_capacity)
4418  return;
4419  if (GNUNET_NO == queue->unlimited_length)
4420  queue->q_capacity--;
4422  "Queue %s with qid %u has capacity %lu\n",
4423  queue->address,
4424  queue->qid,
4425  queue->q_capacity);
4427  queue->tc->details.communicator.total_queue_length)
4428  queue->idle = GNUNET_NO;
4429  if (QUEUE_LENGTH_LIMIT == queue->queue_length)
4430  queue->idle = GNUNET_NO;
4431  if (0 == queue->q_capacity)
4432  queue->idle = GNUNET_NO;
4434  "Sending message MID %llu of type %u (%u) and size %u with MQ %p\n",
4435  smt->mid,
4436  ntohs (((const struct GNUNET_MessageHeader *) payload)->type),
4437  ntohs (smt->header.size),
4438  payload_size,
4439  queue->tc->mq);
4440  GNUNET_MQ_send (queue->tc->mq, env);
4441  }
4442 }
4443 
4444 
4455 static struct GNUNET_TIME_Relative
4456 route_via_neighbour (const struct Neighbour *n,
4457  const struct GNUNET_MessageHeader *hdr,
4459 {
4460  struct GNUNET_TIME_Absolute now;
4461  unsigned int candidates;
4462  unsigned int sel1;
4463  unsigned int sel2;
4464  struct GNUNET_TIME_Relative rtt;
4465 
4466  /* Pick one or two 'random' queues from n (under constraints of options) */
4467  now = GNUNET_TIME_absolute_get ();
4468  /* FIXME-OPTIMIZE: give queues 'weights' and pick proportional to
4469  weight in the future; weight could be assigned by observed
4470  bandwidth (note: not sure if we should do this for this type
4471  of control traffic though). */
4472  candidates = 0;
4473  for (struct Queue *pos = n->queue_head; NULL != pos;
4474  pos = pos->next_neighbour)
4475  {
4476  if ((0 != (options & RMO_UNCONFIRMED_ALLOWED)) ||
4477  (pos->validated_until.abs_value_us > now.abs_value_us))
4478  candidates++;
4479  }
4480  if (0 == candidates)
4481  {
4482  /* This can happen rarely if the last confirmed queue timed
4483  out just as we were beginning to process this message. */
4485  "Could not route message of type %u to %s: no valid queue\n",
4486  ntohs (hdr->type),
4487  GNUNET_i2s (&n->pid));
4489  "# route selection failed (all no valid queue)",
4490  1,
4491  GNUNET_NO);
4493  }
4494 
4497  if (0 == (options & RMO_REDUNDANT))
4498  sel2 = candidates; /* picks none! */
4499  else
4501  candidates = 0;
4502  for (struct Queue *pos = n->queue_head; NULL != pos;
4503  pos = pos->next_neighbour)
4504  {
4505  if ((0 != (options & RMO_UNCONFIRMED_ALLOWED)) ||
4506  (pos->validated_until.abs_value_us > now.abs_value_us))
4507  {
4508  if ((sel1 == candidates) || (sel2 == candidates))
4509  {
4511  "Routing message of type %u to %s using %s (#%u)\n",
4512  ntohs (hdr->type),
4513  GNUNET_i2s (&n->pid),
4514  pos->address,
4515  (sel1 == candidates) ? 1 : 2);
4516  rtt = GNUNET_TIME_relative_min (rtt, pos->pd.aged_rtt);
4517  queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
4518  }
4519  candidates++;
4520  }
4521  }
4522  return rtt;
4523 }
4524 
4525 
4530 {
4534  gcry_cipher_hd_t cipher;
4535 
4539  struct
4540  {
4545 
4549  char aes_key[256 / 8];
4550 
4554  char aes_ctr[128 / 8];
4556 };
4557 
4558 
4567 static void
4569  const struct GNUNET_ShortHashCode *iv,
4570  struct DVKeyState *key)
4571 {
4572  /* must match #dh_key_derive_eph_pub */
4574  GNUNET_CRYPTO_kdf (&key->material,
4575  sizeof(key->material),
4576  "transport-backchannel-key",
4577  strlen ("transport-backchannel-key"),
4578  km,
4579  sizeof(*km),
4580  iv,
4581  sizeof(*iv),
4582  NULL));
4584  "Deriving backchannel key based on KM %s and IV %s\n",
4585  GNUNET_h2s (km),
4586  GNUNET_sh2s (iv));
4587  GNUNET_assert (0 == gcry_cipher_open (&key->cipher,
4588  GCRY_CIPHER_AES256 /* low level: go for speed */,
4589  GCRY_CIPHER_MODE_CTR,
4590  0 /* flags */));
4591  GNUNET_assert (0 == gcry_cipher_setkey (key->cipher,
4592  &key->material.aes_key,
4593  sizeof(key->material.aes_key)));
4594  gcry_cipher_setctr (key->cipher,
4595  &key->material.aes_ctr,
4596  sizeof(key->material.aes_ctr));
4597 }
4598 
4599 
4609 static void
4611  const struct GNUNET_CRYPTO_EcdhePrivateKey *priv_ephemeral,
4612  const struct GNUNET_PeerIdentity *target,
4613  const struct GNUNET_ShortHashCode *iv,
4614  struct DVKeyState *key)
4615 {
4616  struct GNUNET_HashCode km;
4617 
4619  &target->public_key,
4620  &km));
4621  dv_setup_key_state_from_km (&km, iv, key);
4622 }
4623 
4624 
4634 static void
4636  const struct GNUNET_ShortHashCode *iv,
4637  struct DVKeyState *key)
4638 {
4639  struct GNUNET_HashCode km;
4640 
4642  pub_ephemeral,
4643  &km));
4644  dv_setup_key_state_from_km (&km, iv, key);
4645 }
4646 
4647 
4657 static void
4658 dv_hmac (const struct DVKeyState *key,
4659  struct GNUNET_HashCode *hmac,
4660  const void *data,
4661  size_t data_size)
4662 {
4663  GNUNET_CRYPTO_hmac (&key->material.hmac_key, data, data_size, hmac);
4664 }
4665 
4666 
4676 static void
4677 dv_encrypt (struct DVKeyState *key, const void *in, void *dst, size_t in_size)
4678 {
4679  GNUNET_assert (0 ==
4680  gcry_cipher_encrypt (key->cipher, dst, in_size, in, in_size));
4681 }
4682 
4683 
4693 static void
4695  void *out,
4696  const void *ciph,
4697  size_t out_size)
4698 {
4699  GNUNET_assert (
4700  0 == gcry_cipher_decrypt (key->cipher, out, out_size, ciph, out_size));
4701 }
4702 
4703 
4709 static void
4711 {
4712  gcry_cipher_close (key->cipher);
4713  GNUNET_CRYPTO_zero_keys (&key->material, sizeof(key->material));
4714 }
4715 
4716 
4727 typedef void (*DVMessageHandler) (void *cls,
4728  struct Neighbour *next_hop,
4729  const struct GNUNET_MessageHeader *hdr,
4731 
4746 static struct GNUNET_TIME_Relative
4748  unsigned int num_dvhs,
4749  struct DistanceVectorHop **dvhs,
4750  const struct GNUNET_MessageHeader *hdr,
4751  DVMessageHandler use,
4752  void *use_cls,
4754  enum GNUNET_GenericReturnValue without_fc)
4755 {
4756  struct TransportDVBoxMessage box_hdr;
4757  struct TransportDVBoxPayloadP payload_hdr;
4758  uint16_t enc_body_size = ntohs (hdr->size);
4759  char enc[sizeof(struct TransportDVBoxPayloadP) + enc_body_size] GNUNET_ALIGN;
4760  struct TransportDVBoxPayloadP *enc_payload_hdr =
4761  (struct TransportDVBoxPayloadP *) enc;
4762  struct DVKeyState *key;
4763  struct GNUNET_TIME_Relative rtt;
4764 
4765  key = GNUNET_new (struct DVKeyState);
4766  /* Encrypt payload */
4768  box_hdr.total_hops = htons (0);
4769  box_hdr.without_fc = htons (without_fc);
4770  update_ephemeral (dv);
4771  box_hdr.ephemeral_key = dv->ephemeral_key;
4772  payload_hdr.sender_sig = dv->sender_sig;
4773 
4775  &box_hdr.iv,
4776  sizeof(box_hdr.iv));
4777  dh_key_derive_eph_pid (&dv->private_key, &dv->target, &box_hdr.iv, key);
4778  payload_hdr.sender = GST_my_identity;
4779  payload_hdr.monotonic_time = GNUNET_TIME_absolute_hton (dv->monotime);
4780  dv_encrypt (key, &payload_hdr, enc_payload_hdr, sizeof(payload_hdr));
4781  dv_encrypt (key,
4782  hdr,
4783  &enc[sizeof(struct TransportDVBoxPayloadP)],
4784  enc_body_size);
4785  dv_hmac (key, &box_hdr.hmac, enc, sizeof(enc));
4786  dv_key_clean (key);
4788  /* For each selected path, take the pre-computed header and body
4789  and add the path in the middle of the message; then send it. */
4790  for (unsigned int i = 0; i < num_dvhs; i++)
4791  {
4792  struct DistanceVectorHop *dvh = dvhs[i];
4793  unsigned int num_hops = dvh->distance + 1;
4794  char buf[sizeof(struct TransportDVBoxMessage)
4795  + sizeof(struct GNUNET_PeerIdentity) * num_hops
4796  + sizeof(struct TransportDVBoxPayloadP)
4797  + enc_body_size] GNUNET_ALIGN;
4798  struct GNUNET_PeerIdentity *dhops;
4799 
4800  box_hdr.header.size = htons (sizeof(buf));
4801  box_hdr.orig_size = htons (sizeof(buf));
4802  box_hdr.num_hops = htons (num_hops);
4803  memcpy (buf, &box_hdr, sizeof(box_hdr));
4804  dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof(box_hdr)];
4805  memcpy (dhops,
4806  dvh->path,
4807  dvh->distance * sizeof(struct GNUNET_PeerIdentity));
4808  dhops[dvh->distance] = dv->target;
4809  if (GNUNET_EXTRA_LOGGING > 0)
4810  {
4811  char *path;
4812 
4814  for (unsigned int j = 0; j < num_hops; j++)
4815  {
4816  char *tmp;
4817 
4818  GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[j]));
4819  GNUNET_free (path);
4820  path = tmp;
4821  }
4823  "Routing message of type %u to %s using DV (#%u/%u) via %s\n",
4824  ntohs (hdr->type),
4825  GNUNET_i2s (&dv->target),
4826  i + 1,
4827  num_dvhs,
4828  path);
4829  GNUNET_free (path);
4830  }
4831  rtt = GNUNET_TIME_relative_min (rtt, dvh->pd.aged_rtt);
4832  memcpy (&dhops[num_hops], enc, sizeof(enc));
4833  use (use_cls,
4834  dvh->next_hop,
4835  (const struct GNUNET_MessageHeader *) buf,
4836  options);
4837  GNUNET_free (key);
4838  }
4839  return rtt;
4840 }
4841 
4842 
4852 static void
4854  struct Neighbour *next_hop,
4855  const struct GNUNET_MessageHeader *hdr,
4857 {
4858  (void) cls;
4859  (void) route_via_neighbour (next_hop, hdr, RMO_UNCONFIRMED_ALLOWED);
4860 }
4861 
4862 
4874 static struct GNUNET_TIME_Relative
4876 // route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
4877  const struct GNUNET_MessageHeader *hdr,
4879 {
4880  // struct VirtualLink *vl;
4881  struct Neighbour *n;
4882  struct DistanceVector *dv;
4883  struct GNUNET_TIME_Relative rtt1;
4884  struct GNUNET_TIME_Relative rtt2;
4885  const struct GNUNET_PeerIdentity *target = &vl->target;
4886 
4888  "Trying to route message of type %u to %s without fc\n",
4889  ntohs (hdr->type),
4890  GNUNET_i2s (target));
4891 
4892  // TODO Do this elsewhere. vl should be given as parameter to method.
4893  // vl = lookup_virtual_link (target);
4894  GNUNET_assert (NULL != vl && GNUNET_YES == vl->confirmed);
4895  if (NULL == vl)
4897  n = vl->n;
4898  dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
4899  if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
4900  {
4901  /* if confirmed is required, and we do not have anything
4902  confirmed, drop respective options */
4903  if (NULL == n)
4904  n = lookup_neighbour (target);
4905  if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
4907  }
4908  if ((NULL == n) && (NULL == dv))
4909  {
4911  "Cannot route message of type %u to %s: no route\n",
4912  ntohs (hdr->type),
4913  GNUNET_i2s (target));
4915  "# Messages dropped in routing: no acceptable method",
4916  1,
4917  GNUNET_NO);
4919  }
4921  "Routing message of type %u to %s with options %X\n",
4922  ntohs (hdr->type),
4923  GNUNET_i2s (target),
4924  (unsigned int) options);
4925  /* If both dv and n are possible and we must choose:
4926  flip a coin for the choice between the two; for now 50/50 */
4927  if ((NULL != n) && (NULL != dv) && (0 == (options & RMO_REDUNDANT)))
4928  {
4930  n = NULL;
4931  else
4932  dv = NULL;
4933  }
4934  if ((NULL != n) && (NULL != dv))
4935  options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's
4936  enough for redundancy, so clear the flag. */
4939  if (NULL != n)
4940  {
4942  "Try to route message of type %u to %s without fc via neighbour\n",
4943  ntohs (hdr->type),
4944  GNUNET_i2s (target));
4945  rtt1 = route_via_neighbour (n, hdr, options);
4946  }
4947  if (NULL != dv)
4948  {
4949  struct DistanceVectorHop *hops[2];
4950  unsigned int res;
4951 
4953  options,
4954  hops,
4955  (0 == (options & RMO_REDUNDANT)) ? 1 : 2);
4956  if (0 == res)
4957  {
4959  "Failed to route message, could not determine DV path\n");
4960  return rtt1;
4961  }
4963  "encapsulate_for_dv 1\n");
4964  rtt2 = encapsulate_for_dv (dv,
4965  res,
4966  hops,
4967  hdr,
4969  NULL,
4970  options & (~RMO_REDUNDANT),
4971  GNUNET_YES);
4972  }
4973  return GNUNET_TIME_relative_min (rtt1, rtt2);
4974 }
4975 
4976 
4977 static void
4978 consider_sending_fc (void *cls);
4979 
4986 static void
4988 {
4989  struct VirtualLink *vl = cls;
4990  vl->fc_retransmit_task = NULL;
4991  consider_sending_fc (cls);
4992 }
4993 
4994 
5001 static void
5003 {
5004  struct VirtualLink *vl = cls;
5005  struct GNUNET_TIME_Absolute monotime;
5006  struct TransportFlowControlMessage fc;
5008  struct GNUNET_TIME_Relative rtt;
5009 
5011  /* OPTIMIZE-FC-BDP: decide sane criteria on when to do this, instead of doing
5012  it always! */
5013  /* For example, we should probably ONLY do this if a bit more than
5014  an RTT has passed, or if the window changed "significantly" since
5015  then. See vl->last_fc_rtt! NOTE: to do this properly, we also
5016  need an estimate for the bandwidth-delay-product for the entire
5017  VL, as that determines "significantly". We have the delay, but
5018  the bandwidth statistics need to be added for the VL!*/(void) duration;
5019 
5021  "Sending FC seq %u to %s with new window %llu\n",
5022  (unsigned int) vl->fc_seq_gen,
5023  GNUNET_i2s (&vl->target),
5024  (unsigned long long) vl->incoming_fc_window_size);
5026  vl->last_fc_transmission = monotime;
5028  fc.header.size = htons (sizeof(fc));
5029  fc.seq = htonl (vl->fc_seq_gen++);
5033  fc.sender_time = GNUNET_TIME_absolute_hton (monotime);
5035  if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us)
5036  {
5039  "FC retransmission to %s failed, will retry in %s\n",
5040  GNUNET_i2s (&vl->target),
5043  }
5044  else
5045  {
5046  /* OPTIMIZE-FC-BDP: rtt is not ideal, we can do better! */
5047  vl->last_fc_rtt = rtt;
5048  }
5049  if (NULL != vl->fc_retransmit_task)
5052  {
5054  vl->fc_retransmit_count = 0;
5055  }
5056  vl->fc_retransmit_task =
5058  vl->fc_retransmit_count++;
5059 }
5060 
5061 
5078 static void
5080 {
5081  struct Neighbour *n = vl->n;
5082  struct DistanceVector *dv = vl->dv;
5083  struct GNUNET_TIME_Absolute now;
5084  struct VirtualLink *vl_next_hop;
5085  int elig;
5086 
5088  "check_vl_transmission to target %s\n",
5089  GNUNET_i2s (&vl->target));
5090  /* Check that we have an eligible pending message!
5091  (cheaper than having #transmit_on_queue() find out!) */
5092  elig = GNUNET_NO;
5093  for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
5094  pm = pm->next_vl)
5095  {
5097  "check_vl_transmission loop\n");
5098  if (NULL != pm->qe)
5099  continue; /* not eligible, is in a queue! */
5100  if (pm->bytes_msg + vl->outbound_fc_window_size_used >
5102  {
5104  "Stalled message %lu transmission on VL %s due to flow control: %llu < %llu\n",
5105  pm->logging_uuid,
5106  GNUNET_i2s (&vl->target),
5107  (unsigned long long) vl->outbound_fc_window_size,
5108  (unsigned long long) (pm->bytes_msg
5110  consider_sending_fc (vl);
5111  return; /* We have a message, but flow control says "nope" */
5112  }
5114  "Target window on VL %s not stalled. Scheduling transmission on queue\n",
5115  GNUNET_i2s (&vl->target));
5116  /* Notify queues at direct neighbours that we are interested */
5117  now = GNUNET_TIME_absolute_get ();
5118  if (NULL != n)
5119  {
5120  for (struct Queue *queue = n->queue_head; NULL != queue;
5121  queue = queue->next_neighbour)
5122  {
5123  if ((GNUNET_YES == queue->idle) &&
5124  (queue->validated_until.abs_value_us > now.abs_value_us))
5125  {
5127  "Direct neighbour %s not stalled\n",
5128  GNUNET_i2s (&n->pid));
5130  queue,
5132  elig = GNUNET_YES;
5133  }
5134  else
5136  "Neighbour Queue QID: %u (%u) busy or invalid\n",
5137  queue->qid,
5138  queue->idle);
5139  }
5140  }
5141  /* Notify queues via DV that we are interested */
5142  if (NULL != dv)
5143  {
5144  /* Do DV with lower scheduler priority, which effectively means that
5145  IF a neighbour exists and is available, we prefer it. */
5146  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
5147  pos = pos->next_dv)
5148  {
5149  struct Neighbour *nh = pos->next_hop;
5150 
5151 
5152  if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
5153  continue; /* skip this one: path not validated */
5154  else
5155  {
5156  vl_next_hop = lookup_virtual_link (&nh->pid);
5157  if (pm->bytes_msg + vl_next_hop->outbound_fc_window_size_used >
5158  vl_next_hop->outbound_fc_window_size)
5159  {
5161  "Stalled message %lu transmission on next hop %s due to flow control: %llu < %llu\n",
5162  pm->logging_uuid,
5163  GNUNET_i2s (&vl_next_hop->target),
5164  (unsigned long
5165  long) vl_next_hop->outbound_fc_window_size,
5166  (unsigned long long) (pm->bytes_msg
5167  + vl_next_hop->
5168  outbound_fc_window_size_used));
5169  consider_sending_fc (vl_next_hop);
5170  continue; /* We have a message, but flow control says "nope" for the first hop of this path */
5171  }
5172  for (struct Queue *queue = nh->queue_head; NULL != queue;
5173  queue = queue->next_neighbour)
5174  if ((GNUNET_YES == queue->idle) &&
5175  (queue->validated_until.abs_value_us > now.abs_value_us))
5176  {
5178  "Next hop neighbour %s not stalled\n",
5179  GNUNET_i2s (&nh->pid));
5181  queue,
5183  elig = GNUNET_YES;
5184  }
5185  else
5187  "DV Queue QID: %u (%u) busy or invalid\n",
5188  queue->qid,
5189  queue->idle);
5190  }
5191  }
5192  }
5193  if (GNUNET_YES == elig)
5195  "Eligible message %lu of size %llu to %s: %llu/%llu\n",
5196  pm->logging_uuid,
5197  pm->bytes_msg,
5198  GNUNET_i2s (&vl->target),
5199  (unsigned long long) vl->outbound_fc_window_size,
5200  (unsigned long long) (pm->bytes_msg
5202  break;
5203  }
5204 }
5205 
5206 
5213 static void
5214 handle_client_send (void *cls, const struct OutboundMessage *obm)
5215 {
5216  struct TransportClient *tc = cls;
5217  struct PendingMessage *pm;
5218  const struct GNUNET_MessageHeader *obmm;
5219  uint32_t bytes_msg;
5220  struct VirtualLink *vl;
5222 
5223  GNUNET_assert (CT_CORE == tc->type);
5224  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
5225  bytes_msg = ntohs (obmm->size);
5226  pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
5227  vl = lookup_virtual_link (&obm->peer);
5228  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5229  {
5231  "Don't have %s as a neighbour (anymore).\n",
5232  GNUNET_i2s (&obm->peer));
5233  /* Failure: don't have this peer as a neighbour (anymore).
5234  Might have gone down asynchronously, so this is NOT
5235  a protocol violation by CORE. Still count the event,
5236  as this should be rare. */
5239  "# messages dropped (neighbour unknown)",
5240  1,
5241  GNUNET_NO);
5242  return;
5243  }
5244 
5245  pm = GNUNET_malloc (sizeof(struct PendingMessage) + bytes_msg);
5247  "1 created pm %p storing vl %p\n",
5248  pm,
5249  vl);
5250  pm->logging_uuid = logging_uuid_gen++;
5251  pm->prefs = pp;
5252  pm->client = tc;
5253  pm->vl = vl;
5254  pm->bytes_msg = bytes_msg;
5255  memcpy (&pm[1], obmm, bytes_msg);
5257  "Sending %u bytes as <%llu> to %s\n",
5258  bytes_msg,
5259  pm->logging_uuid,
5260  GNUNET_i2s (&obm->peer));
5262  tc->details.core.pending_msg_head,
5263  tc->details.core.pending_msg_tail,
5264  pm);
5266  vl->pending_msg_head,
5267  vl->pending_msg_tail,
5268  pm);
5269  check_vl_transmission (vl);
5271 }
5272 
5273 
5283 static void
5285  void *cls,
5286  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
5287 {
5288  struct Neighbour *n;
5289  struct VirtualLink *vl;
5290  struct TransportClient *tc = cls;
5291  const struct GNUNET_MessageHeader *inbox =
5292  (const struct GNUNET_MessageHeader *) &cb[1];
5293  uint16_t isize = ntohs (inbox->size);
5294  const char *is = ((const char *) &cb[1]) + isize;
5295  size_t slen = strlen (is) + 1;
5296  char
5297  mbuf[slen + isize
5298  + sizeof(struct
5302 
5303  /* 0-termination of 'is' was checked already in
5304  #check_communicator_backchannel() */
5306  "Preparing backchannel transmission to %s:%s of type %u and size %u\n",
5307  GNUNET_i2s (&cb->pid),
5308  is,
5309  ntohs (inbox->type),
5310  ntohs (inbox->size));
5311  /* encapsulate and encrypt message */
5312  be->header.type =
5314  be->header.size = htons (sizeof(mbuf));
5315  memcpy (&be[1], inbox, isize);
5316  memcpy (&mbuf[sizeof(struct TransportBackchannelEncapsulationMessage)
5317  + isize],
5318  is,
5319  strlen (is) + 1);
5320  // route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED);
5321  vl = lookup_virtual_link (&cb->pid);
5322  if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
5323  {
5325  }
5326  else
5327  {
5328  /* Use route via neighbour */
5329  n = lookup_neighbour (&cb->pid);
5330  if (NULL != n)
5332  n,
5333  &be->header,
5334  RMO_NONE);
5335  }
5337 }
5338 
5339 
5347 static int
5349  const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
5350 {
5351  struct TransportClient *tc = cls;
5352 
5353  if (CT_COMMUNICATOR != tc->type)
5354  {
5355  GNUNET_break (0);
5356  return GNUNET_SYSERR;
5357  }
5359  return GNUNET_OK;
5360 }
5361 
5362 
5368 static void
5369 store_pi (void *cls);
5370 
5371 
5378 static void
5379 peerstore_store_own_cb (void *cls, int success)
5380 {
5381  struct AddressListEntry *ale = cls;
5382 
5383  ale->sc = NULL;
5384  if (GNUNET_YES != success)
5386  "Failed to store our own address `%s' in peerstore!\n",
5387  ale->address);
5388  else
5390  "Successfully stored our own address `%s' in peerstore!\n",
5391  ale->address);
5392  /* refresh period is 1/4 of expiration time, that should be plenty
5393  without being excessive. */
5394  ale->st =
5396  4ULL),
5397  &store_pi,
5398  ale);
5399 }
5400 
5401 
5407 static void
5408 store_pi (void *cls)
5409 {
5410  struct AddressListEntry *ale = cls;
5411  void *addr;
5412  size_t addr_len;
5414 
5415  ale->st = NULL;
5418  "Storing our address `%s' in peerstore until %s!\n",
5419  ale->address,
5422  ale->nt,
5425  &addr,
5426  &addr_len);
5428  "transport",
5429  &GST_my_identity,
5431  addr,
5432  addr_len,
5433  expiration,
5436  ale);
5437  GNUNET_free (addr);
5438  if (NULL == ale->sc)
5439  {
5441  "Failed to store our address `%s' with peerstore\n",
5442  ale->address);
5443  ale->st =
5445  }
5446 }
5447 
5448 
5455 static void
5457  const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
5458 {
5459  struct TransportClient *tc = cls;
5460  struct AddressListEntry *ale;
5461  size_t slen;
5462 
5463  /* 0-termination of &aam[1] was checked in #check_add_address */
5465  "Communicator added address `%s'!\n",
5466  (const char *) &aam[1]);
5467  slen = ntohs (aam->header.size) - sizeof(*aam);
5468  ale = GNUNET_malloc (sizeof(struct AddressListEntry) + slen);
5469  ale->tc = tc;
5470  ale->address = (const char *) &ale[1];
5471  ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
5472  ale->aid = aam->aid;
5473  ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
5474  memcpy (&ale[1], &aam[1], slen);
5475  GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
5476  tc->details.communicator.addr_tail,
5477  ale);
5478  ale->st = GNUNET_SCHEDULER_add_now (&store_pi, ale);
5480 }
5481 
5482 
5489 static void
5491  const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
5492 {
5493  struct TransportClient *tc = cls;
5494  struct AddressListEntry *alen;
5495 
5496  if (CT_COMMUNICATOR != tc->type)
5497  {
5498  GNUNET_break (0);
5499  GNUNET_SERVICE_client_drop (tc->client);
5500  return;
5501  }
5502  for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
5503  NULL != ale;
5504  ale = alen)
5505  {
5506  alen = ale->next;
5507  if (dam->aid != ale->aid)
5508  continue;
5509  GNUNET_assert (ale->tc == tc);
5511  "Communicator deleted address `%s'!\n",
5512  ale->address);
5515  return;
5516  }
5518  "Communicator removed address we did not even have.\n");
5520  // GNUNET_SERVICE_client_drop (tc->client);
5521 }
5522 
5523 
5531 static void
5533  const struct GNUNET_MessageHeader *msg);
5534 
5535 
5543 static void
5544 core_env_sent_cb (void *cls)
5545 {
5546  struct CoreSentContext *ctx = cls;
5547  struct VirtualLink *vl = ctx->vl;
5548 
5549  if (NULL == vl)
5550  {
5551  /* lost the link in the meantime, ignore */
5552  GNUNET_free (ctx);
5553  return;
5554  }
5557  vl->incoming_fc_window_size_ram -= ctx->size;
5558  vl->incoming_fc_window_size_used += ctx->isize;
5559  consider_sending_fc (vl);
5560  GNUNET_free (ctx);
5561 }
5562 
5563 
5572 static void
5573 handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
5574 {
5575  struct CommunicatorMessageContext *cmc = cls;
5576  struct VirtualLink *vl;
5577  uint16_t size = ntohs (mh->size);
5578  int have_core;
5579 
5581  "Handling raw message of type %u with %u bytes\n",
5582  (unsigned int) ntohs (mh->type),
5583  (unsigned int) ntohs (mh->size));
5584 
5585  if ((size > UINT16_MAX - sizeof(struct InboundMessage)) ||
5586  (size < sizeof(struct GNUNET_MessageHeader)))
5587  {
5588  struct GNUNET_SERVICE_Client *client = cmc->tc->client;
5589 
5590  GNUNET_break (0);
5591  finish_cmc_handling (cmc);
5592  GNUNET_SERVICE_client_drop (client);
5593  return;
5594  }
5595  vl = lookup_virtual_link (&cmc->im.sender);
5596  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5597  {
5598  /* FIXME: sender is giving us messages for CORE but we don't have
5599  the link up yet! I *suspect* this can happen right now (i.e.
5600  sender has verified us, but we didn't verify sender), but if
5601  we pass this on, CORE would be confused (link down, messages
5602  arrive). We should investigate more if this happens often,
5603  or in a persistent manner, and possibly do "something" about
5604  it. Thus logging as error for now. */
5605  GNUNET_break_op (0);
5607  "# CORE messages dropped (virtual link still down)",
5608  1,
5609  GNUNET_NO);
5610 
5612  "CORE messages of type %u with %u bytes dropped (virtual link still down)\n",
5613  (unsigned int) ntohs (mh->type),
5614  (unsigned int) ntohs (mh->size));
5615  finish_cmc_handling (cmc);
5616  return;
5617  }
5618  if (vl->incoming_fc_window_size_ram > UINT_MAX - size)
5619  {
5621  "# CORE messages dropped (FC arithmetic overflow)",
5622  1,
5623  GNUNET_NO);
5625  "CORE messages of type %u with %u bytes dropped (FC arithmetic overflow)\n",
5626  (unsigned int) ntohs (mh->type),
5627  (unsigned int) ntohs (mh->size));
5628  finish_cmc_handling (cmc);
5629  return;
5630  }
5632  {
5634  "# CORE messages dropped (FC window overflow)",
5635  1,
5636  GNUNET_NO);
5638  "CORE messages of type %u with %u bytes dropped (FC window overflow)\n",
5639  (unsigned int) ntohs (mh->type),
5640  (unsigned int) ntohs (mh->size));
5641  finish_cmc_handling (cmc);
5642  return;
5643  }
5644 
5645  /* Forward to all CORE clients */
5646  have_core = GNUNET_NO;
5647  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
5648  {
5649  struct GNUNET_MQ_Envelope *env;
5650  struct InboundMessage *im;
5651  struct CoreSentContext *ctx;
5652 
5653  if (CT_CORE != tc->type)
5654  continue;
5657  ctx = GNUNET_new (struct CoreSentContext);
5658  ctx->vl = vl;
5659  ctx->size = size;
5660  ctx->isize = (GNUNET_NO == have_core) ? size : 0;
5661  have_core = GNUNET_YES;
5664  im->peer = cmc->im.sender;
5665  memcpy (&im[1], mh, size);
5666  GNUNET_MQ_send (tc->mq, env);
5667  vl->core_recv_window--;
5668  }
5669  if (GNUNET_NO == have_core)
5670  {
5672  "Dropped message to CORE: no CORE client connected!\n");
5673  /* Nevertheless, count window as used, as it is from the
5674  perspective of the other peer! */
5676  /* TODO-M1 */
5678  "Dropped message of type %u with %u bytes to CORE: no CORE client connected!\n",
5679  (unsigned int) ntohs (mh->type),
5680  (unsigned int) ntohs (mh->size));
5681  finish_cmc_handling (cmc);
5682  return;
5683  }
5685  "Delivered message from %s of type %u to CORE\n",
5686  GNUNET_i2s (&cmc->im.sender),
5687  ntohs (mh->type));
5688  if (vl->core_recv_window > 0)
5689  {
5690  finish_cmc_handling (cmc);
5691  return;
5692  }
5693  /* Wait with calling #finish_cmc_handling(cmc) until the message
5694  was processed by CORE MQs (for CORE flow control)! */
5696 }
5697 
5698 
5706 static int
5708 {
5709  uint16_t size = ntohs (fb->header.size);
5710  uint16_t bsize = size - sizeof(*fb);
5711 
5712  (void) cls;
5713  if (0 == bsize)
5714  {
5715  GNUNET_break_op (0);
5716  return GNUNET_SYSERR;
5717  }
5718  if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
5719  {
5720  GNUNET_break_op (0);
5721  return GNUNET_SYSERR;
5722  }
5723  if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
5724  {
5725  GNUNET_break_op (0);
5726  return GNUNET_SYSERR;
5727  }
5728  return GNUNET_YES;
5729 }
5730 
5731 
5737 static void
5739 {
5740  struct AcknowledgementCummulator *ac = cls;
5741 
5742  ac->task = NULL;
5743  GNUNET_assert (0 == ac->num_acks);
5744  GNUNET_assert (
5745  GNUNET_YES ==
5747  GNUNET_free (ac);
5748 }
5749 
5750 
5756 static void
5758 {
5759  struct Neighbour *n;
5760  struct VirtualLink *vl;
5761  struct AcknowledgementCummulator *ac = cls;
5762  char buf[sizeof(struct TransportReliabilityAckMessage)
5763  + ac->num_acks
5765  struct TransportReliabilityAckMessage *ack =
5768 
5769  ac->task = NULL;
5771  "Sending ACK with %u components to %s\n",
5772  ac->num_acks,
5773  GNUNET_i2s (&ac->target));
5774  GNUNET_assert (0 < ac->num_acks);
5776  ack->header.size =
5777  htons (sizeof(*ack)
5778  + ac->num_acks * sizeof(struct TransportCummulativeAckPayloadP));
5779  ack->ack_counter = htonl (ac->ack_counter += ac->num_acks);
5780  ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
5781  for (unsigned int i = 0; i < ac->num_acks; i++)
5782  {
5783  ap[i].ack_uuid = ac->ack_uuids[i].ack_uuid;
5785  GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time));
5786  }
5787  /*route_control_message_without_fc (
5788  &ac->target,
5789  &ack->header,
5790  RMO_DV_ALLOWED);*/
5791  vl = lookup_virtual_link (&ac->target);
5792  if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
5793  {
5795  vl,
5796  &ack->header,
5797  RMO_DV_ALLOWED);
5798  }
5799  else
5800  {
5801  /* Use route via neighbour */
5802  n = lookup_neighbour (&ac->target);
5803  if (NULL != n)
5805  n,
5806  &ack->header,
5807  RMO_NONE);
5808  }
5809  ac->num_acks = 0;
5812  ac);
5813 }
5814 
5815 
5824 static void
5826  const struct AcknowledgementUUIDP *ack_uuid,
5827  struct GNUNET_TIME_Absolute max_delay)
5828 {
5829  struct AcknowledgementCummulator *ac;
5830 
5832  "Scheduling ACK %s for transmission to %s\n",
5833  GNUNET_uuid2s (&ack_uuid->value),
5834  GNUNET_i2s (pid));
5836  if (NULL == ac)
5837  {
5839  ac->target = *pid;
5840  ac->min_transmission_time = max_delay;
5844  &ac->target,
5845  ac,
5847  }
5848  else
5849  {
5850  if (MAX_CUMMULATIVE_ACKS == ac->num_acks)
5851  {
5852  /* must run immediately, ack buffer full! */
5854  }
5855  GNUNET_SCHEDULER_cancel (ac->task);
5856  ac->min_transmission_time =
5857  GNUNET_TIME_absolute_min (ac->min_transmission_time, max_delay);
5858  }
5859  GNUNET_assert (ac->num_acks < MAX_CUMMULATIVE_ACKS);
5860  ac->ack_uuids[ac->num_acks].receive_time = GNUNET_TIME_absolute_get ();
5861  ac->ack_uuids[ac->num_acks].ack_uuid = *ack_uuid;
5862  ac->num_acks++;
5863  ac->task = GNUNET_SCHEDULER_add_at (ac->min_transmission_time,
5865  ac);
5866 }
5867 
5868 
5873 {
5877  struct MessageUUIDP message_uuid;
5878 
5883 };
5884 
5885 
5895 static int
5896 find_by_message_uuid (void *cls, uint32_t key, void *value)
5897 {
5898  struct FindByMessageUuidContext *fc = cls;
5899  struct ReassemblyContext *rc = value;
5900 
5901  (void) key;
5902  if (0 == GNUNET_memcmp (&fc->message_uuid, &rc->msg_uuid))
5903  {
5904  fc->rc = rc;
5905  return GNUNET_NO;
5906  }
5907  return GNUNET_YES;
5908 }
5909 
5910 
5918 static void
5920 {
5921  struct CommunicatorMessageContext *cmc = cls;
5922  struct VirtualLink *vl;
5923  struct ReassemblyContext *rc;
5924  const struct GNUNET_MessageHeader *msg;
5925  uint16_t msize;
5926  uint16_t fsize;
5927  uint16_t frag_off;
5928  char *target;
5929  struct GNUNET_TIME_Relative cdelay;
5930  struct FindByMessageUuidContext fc;
5931 
5932  vl = lookup_virtual_link (&cmc->im.sender);
5933  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5934  {
5935  struct GNUNET_SERVICE_Client *client = cmc->tc->client;
5936 
5938  "No virtual link for %s to handle fragment\n",
5939  GNUNET_i2s (&cmc->im.sender));
5940  GNUNET_break (0);
5941  finish_cmc_handling (cmc);
5942  GNUNET_SERVICE_client_drop (client);
5943  return;
5944  }
5945  if (NULL == vl->reassembly_map)
5946  {
5948  vl->reassembly_heap =
5953  vl);
5954  }
5955  msize = ntohs (fb->msg_size);
5956  fc.message_uuid = fb->msg_uuid;
5957  fc.rc = NULL;
5959  fb->msg_uuid.uuid,
5961  &fc);
5962  fsize = ntohs (fb->header.size) - sizeof(*fb);
5963  if (NULL == (rc = fc.rc))
5964  {
5965  rc = GNUNET_malloc (sizeof(*rc) + msize /* reassembly payload buffer */
5966  + (msize + 7) / 8 * sizeof(uint8_t) /* bitfield */);
5967  rc->msg_uuid = fb->msg_uuid;
5968  rc->virtual_link = vl;
5969  rc->msg_size = msize;
5970  rc->reassembly_timeout =
5974  rc,
5978  vl->reassembly_map,
5979  rc->msg_uuid.uuid,
5980  rc,
5982  target = (char *) &rc[1];
5983  rc->bitfield = (uint8_t *) (target + rc->msg_size);
5984  if (fsize != rc->msg_size)
5985  rc->msg_missing = rc->msg_size;
5986  else
5987  rc->msg_missing = 0;
5989  "Received fragment with size %u at offset %u/%u %u bytes missing from %s for NEW message %u\n",
5990  fsize,
5991  ntohs (fb->frag_off),
5992  msize,
5993  rc->msg_missing,
5994  GNUNET_i2s (&cmc->im.sender),
5995  (unsigned int) fb->msg_uuid.uuid);
5996  }
5997  else
5998  {
5999  target = (char *) &rc[1];
6001  "Received fragment at offset %u/%u from %s for message %u\n",
6002  ntohs (fb->frag_off),
6003  msize,
6004  GNUNET_i2s (&cmc->im.sender),
6005  (unsigned int) fb->msg_uuid.uuid);
6006  }
6007  if (msize != rc->msg_size)
6008  {
6009  GNUNET_break (0);
6010  finish_cmc_handling (cmc);
6011  return;
6012  }
6013 
6014  /* reassemble */
6015  if (0 == fsize)
6016  {
6017  GNUNET_break (0);
6018  finish_cmc_handling (cmc);
6019  return;
6020  }
6021  frag_off = ntohs (fb->frag_off);
6022  if (frag_off + fsize > msize)
6023  {
6024  /* Fragment (plus fragment size) exceeds message size! */
6025  GNUNET_break_op (0);
6026  finish_cmc_handling (cmc);
6027  return;
6028  }
6029  memcpy (&target[frag_off], &fb[1], fsize);
6030  /* update bitfield and msg_missing */
6031  for (unsigned int i = frag_off; i < frag_off + fsize; i++)
6032  {
6033  if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
6034  {
6035  rc->bitfield[i / 8] |= (1 << (i % 8));
6036  rc->msg_missing--;
6037  }
6038  }
6039 
6040  /* Compute cumulative ACK */
6042  cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->msg_missing / fsize);
6043  if (0 == rc->msg_missing)
6044  cdelay = GNUNET_TIME_UNIT_ZERO;
6045  cummulative_ack (&cmc->im.sender,
6046  &fb->ack_uuid,
6049  /* is reassembly complete? */
6050  if (0 != rc->msg_missing)
6051  {
6052  finish_cmc_handling (cmc);
6053  return;
6054  }
6055  /* reassembly is complete, verify result */
6056  msg = (const struct GNUNET_MessageHeader *) &rc[1];
6057  if (ntohs (msg->size) != rc->msg_size)
6058  {
6059  GNUNET_break (0);
6061  finish_cmc_handling (cmc);
6062  return;
6063  }
6064  /* successful reassembly */
6066  "Fragment reassembly complete for message %u\n",
6067  (unsigned int) fb->msg_uuid.uuid);
6068  /* FIXME: check that the resulting msg is NOT a
6069  DV Box or Reliability Box, as that is NOT allowed! */
6070  demultiplex_with_cmc (cmc, msg);
6071  /* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still
6072  en-route and we forget that we finished this reassembly immediately!
6073  -> keep around until timeout?
6074  -> shorten timeout based on ACK? */
6076 }
6077 
6078 
6086 static int
6088  const struct TransportReliabilityBoxMessage *rb)
6089 {
6090  (void) cls;
6091  const struct GNUNET_MessageHeader *inbox = (const struct
6092  GNUNET_MessageHeader *) &rb[1];
6093 
6095  "check_send_msg with size %u: inner msg type %u and size %u (%u %u)\n",
6096  ntohs (rb->header.size),
6097  ntohs (inbox->type),
6098  ntohs (inbox->size),
6099  sizeof (struct TransportReliabilityBoxMessage),
6100  sizeof (struct GNUNET_MessageHeader));
6102  return GNUNET_YES;
6103 }
6104 
6105 
6113 static void
6115  const struct TransportReliabilityBoxMessage *rb)
6116 {
6117  struct CommunicatorMessageContext *cmc = cls;
6118  const struct GNUNET_MessageHeader *inbox =
6119  (const struct GNUNET_MessageHeader *) &rb[1];
6120  struct GNUNET_TIME_Relative rtt;
6121 
6123  "Received reliability box from %s with UUID %s of type %u\n",
6124  GNUNET_i2s (&cmc->im.sender),
6125  GNUNET_uuid2s (&rb->ack_uuid.value),
6126  (unsigned int) ntohs (inbox->type));
6127  rtt = GNUNET_TIME_UNIT_SECONDS; /* FIXME: should base this on "RTT", but we
6128  do not really have an RTT for the
6129  * incoming* queue (should we have
6130  the sender add it to the rb message?) */
6131  cummulative_ack (
6132  &cmc->im.sender,
6133  &rb->ack_uuid,
6134  (0 == ntohl (rb->ack_countdown))
6137  GNUNET_TIME_relative_divide (rtt, 8 /* FIXME: magic constant */)));
6138  /* continue with inner message */
6139  /* FIXME: check that inbox is NOT a DV Box, fragment or another
6140  reliability box (not allowed!) */
6141  demultiplex_with_cmc (cmc, inbox);
6142 }
6143 
6144 
6153 static void
6154 update_pd_age (struct PerformanceData *pd, unsigned int age)
6155 {
6156  unsigned int sage;
6157 
6158  if (age == pd->last_age)
6159  return; /* nothing to do */
6160  sage = GNUNET_MAX (pd->last_age, age - 2 * GOODPUT_AGING_SLOTS);
6161  for (unsigned int i = sage; i <= age - GOODPUT_AGING_SLOTS; i++)
6162  {
6163  struct TransmissionHistoryEntry *the = &pd->the[i % GOODPUT_AGING_SLOTS];
6164 
6165  the->bytes_sent = 0;
6166  the->bytes_received = 0;
6167  }
6168  pd->last_age = age;
6169 }
6170 
6171 
6180 static void
6182  struct GNUNET_TIME_Relative rtt,
6183  uint16_t bytes_transmitted_ok)
6184 {
6185  uint64_t nval = rtt.rel_value_us;
6186  uint64_t oval = pd->aged_rtt.rel_value_us;
6187  unsigned int age = get_age ();
6188  struct TransmissionHistoryEntry *the = &pd->the[age % GOODPUT_AGING_SLOTS];
6189 
6190  if (oval == GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us)
6191  pd->aged_rtt = rtt;
6192  else
6193  pd->aged_rtt.rel_value_us = (nval + 7 * oval) / 8;
6194  update_pd_age (pd, age);
6195  the->bytes_received += bytes_transmitted_ok;
6196 }
6197 
6198 
6206 static void
6208  struct GNUNET_TIME_Relative rtt,
6209  uint16_t bytes_transmitted_ok)
6210 {
6211  update_performance_data (&q->pd, rtt, bytes_transmitted_ok);
6212 }
6213 
6214 
6222 static void
6224  struct GNUNET_TIME_Relative rtt,
6225  uint16_t bytes_transmitted_ok)
6226 {
6227  update_performance_data (&dvh->pd, rtt, bytes_transmitted_ok);
6228 }
6229 
6230 
6238 static void
6240 {
6241  struct PendingMessage *pos;
6242 
6244  "Complete transmission of message %llu %u\n",
6245  pm->logging_uuid,
6246  pm->pmt);
6247  switch (pm->pmt)
6248  {
6249  case PMT_CORE:
6250  case PMT_RELIABILITY_BOX:
6251  /* Full message sent, we are done */
6253  return;
6254 
6255  case PMT_FRAGMENT_BOX:
6256  /* Fragment sent over reliable channel */
6257  pos = pm->frag_parent;
6258  GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
6260  /* check if subtree is done */
6261  while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
6262  (NULL != pos->frag_parent))
6263  {
6264  pm = pos;
6265  pos = pm->frag_parent;
6266  if ((NULL == pos) && (PMT_DV_BOX == pm->pmt))
6267  {
6269  return;
6270  }
6271  else if (PMT_DV_BOX == pm->pmt)
6272  {
6273  client_send_response (pos);
6274  return;
6275  }
6276  GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
6278  }
6279 
6280  /* Was this the last applicable fragment? */
6281  if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) &&
6282  (pos->frag_off == pos->bytes_msg))
6283  client_send_response (pos);
6284  return;
6285 
6286  case PMT_DV_BOX:
6288  "Completed transmission of message %llu (DV Box)\n",
6289  pm->logging_uuid);
6290  if (NULL != pm->frag_parent)
6291  {
6292  if (NULL != pm->bpm)
6293  {
6294  GNUNET_free (pm->bpm);
6295  }
6296  client_send_response (pm->frag_parent);
6297  }
6298  else
6300  return;
6301  }
6302 }
6303 
6304 
6312 static void
6314  struct GNUNET_TIME_Relative ack_delay)
6315 {
6316  struct GNUNET_TIME_Relative delay;
6317 
6319  if (delay.rel_value_us > ack_delay.rel_value_us)
6321  else
6322  delay = GNUNET_TIME_relative_subtract (delay, ack_delay);
6323  if (NULL != pa->queue)
6325  if (NULL != pa->dvh)
6327  if (NULL != pa->pm)
6330 }
6331 
6332 
6340 static int
6342  const struct TransportReliabilityAckMessage *ra)
6343 {
6344  unsigned int n_acks;
6345 
6346  (void) cls;
6347  n_acks = (ntohs (ra->header.size) - sizeof(*ra))
6348  / sizeof(struct TransportCummulativeAckPayloadP);
6349  if (0 == n_acks)
6350  {
6351  GNUNET_break_op (0);
6352  return GNUNET_SYSERR;
6353  }
6354  if ((ntohs (ra->header.size) - sizeof(*ra)) !=
6355  n_acks * sizeof(struct TransportCummulativeAckPayloadP))
6356  {
6357  GNUNET_break_op (0);
6358  return GNUNET_SYSERR;
6359  }
6360  return GNUNET_OK;
6361 }
6362 
6363 
6371 static void
6373  const struct TransportReliabilityAckMessage *ra)
6374 {
6375  struct CommunicatorMessageContext *cmc = cls;
6376  const struct TransportCummulativeAckPayloadP *ack;
6377  unsigned int n_acks;
6378  uint32_t ack_counter;
6379 
6380  n_acks = (ntohs (ra->header.size) - sizeof(*ra))
6381  / sizeof(struct TransportCummulativeAckPayloadP);
6382  ack = (const struct TransportCummulativeAckPayloadP *) &ra[1];
6383  for (unsigned int i = 0; i < n_acks; i++)
6384  {
6385  struct PendingAcknowledgement *pa =
6387  if (NULL == pa)
6388  {
6390  "Received ACK from %s with UUID %s which is unknown to us!\n",
6391  GNUNET_i2s (&cmc->im.sender),
6392  GNUNET_uuid2s (&ack[i].ack_uuid.value));
6394  GST_stats,
6395  "# FRAGMENT_ACKS dropped, no matching pending message",
6396  1,
6397  GNUNET_NO);
6398  continue;
6399  }
6401  "Received ACK from %s with UUID %s\n",
6402  GNUNET_i2s (&cmc->im.sender),
6403  GNUNET_uuid2s (&ack[i].ack_uuid.value));
6404  handle_acknowledged (pa, GNUNET_TIME_relative_ntoh (ack[i].ack_delay));
6405  }
6406 
6407  ack_counter = htonl (ra->ack_counter);
6408  (void) ack_counter; /* silence compiler warning for now */
6409  // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere!
6410  // (DV and/or Neighbour?)
6411  finish_cmc_handling (cmc);
6412 }
6413 
6414 
6422 static int
6424  void *cls,
6426 {
6427  uint16_t size = ntohs (be->header.size) - sizeof(*be);
6428  const struct GNUNET_MessageHeader *inbox =
6429  (const struct GNUNET_MessageHeader *) &be[1];
6430  const char *is;
6431  uint16_t isize;
6432 
6433  (void) cls;
6434  if (ntohs (inbox->size) >= size)
6435  {
6436  GNUNET_break_op (0);
6437  return GNUNET_SYSERR;
6438  }
6439  isize = ntohs (inbox->size);
6440  is = ((const char *) inbox) + isize;
6441  size -= isize;
6442  if ('\0' != is[size - 1])
6443  {
6444  GNUNET_break_op (0);
6445  return GNUNET_SYSERR;
6446  }
6447  return GNUNET_YES;
6448 }
6449 
6450 
6459 static void
6461  void *cls,
6463 {
6464  struct CommunicatorMessageContext *cmc = cls;
6465  struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
6466  struct GNUNET_MQ_Envelope *env;
6467  struct TransportClient *tc;
6468  const struct GNUNET_MessageHeader *inbox =
6469  (const struct GNUNET_MessageHeader *) &be[1];
6470  uint16_t isize = ntohs (inbox->size);
6471  const char *target_communicator = ((const char *) inbox) + isize;
6472  char *sender;
6473  char *self;
6474 
6475  GNUNET_asprintf (&sender,
6476  "%s",
6477  GNUNET_i2s (&cmc->im.sender));
6478  GNUNET_asprintf (&self,
6479  "%s",
6481 
6482  /* Find client providing this communicator */
6483  for (tc = clients_head; NULL != tc; tc = tc->next)
6484  if ((CT_COMMUNICATOR == tc->type) &&
6485  (0 ==
6486  strcmp (tc->details.communicator.address_prefix, target_communicator)))
6487  break;
6488  if (NULL == tc)
6489  {
6490  char *stastr;
6491 
6492  GNUNET_asprintf (
6493  &stastr,
6494  "# Backchannel message dropped: target communicator `%s' unknown",
6495  target_communicator);
6497  GNUNET_free (stastr);
6498  finish_cmc_handling (cmc);
6499  return;
6500  }
6501  /* Finally, deliver backchannel message to communicator */
6503  "Delivering backchannel message from %s to %s of type %u to %s\n",
6504  sender,
6505  self,
6506  ntohs (inbox->type),
6507  target_communicator);
6509  cbi,
6510  isize,
6512  cbi->pid = cmc->im.sender;
6513  memcpy (&cbi[1], inbox, isize);
6514  GNUNET_MQ_send (tc->mq, env);
6515  finish_cmc_handling (cmc);
6516 }
6517 
6518 
6528 static void
6529 path_cleanup_cb (void *cls)
6530 {
6531  struct DistanceVector *dv = cls;
6532  struct DistanceVectorHop *pos;
6533 
6534  dv->timeout_task = NULL;
6535  while (NULL != (pos = dv->dv_head))
6536  {
6537  GNUNET_assert (dv == pos->dv);
6539  break;
6541  }
6542  if (NULL == pos)
6543  {
6544  free_dv_route (dv);
6545  return;
6546  }
6547  dv->timeout_task =
6549 }
6550 
6551 
6559 static void
6561 {
6562  struct DistanceVector *dv = hop->dv;
6563  struct VirtualLink *vl;
6564 
6565  vl = lookup_virtual_link (&dv->target);
6566  if (NULL == vl)
6567  {
6568 
6569  vl = GNUNET_new (struct VirtualLink);
6571  "Creating new virtual link %p to %s using DV!\n",
6572  vl,
6573  GNUNET_i2s (&dv->target));
6574  vl->confirmed = GNUNET_YES;
6575  vl->message_uuid_ctr =
6577  vl->target = dv->target;
6583  links,
6584  &vl->target,
6585  vl,
6587  vl->dv = dv;
6588  dv->vl = vl;
6589  vl->visibility_task =
6591  consider_sending_fc (vl);
6592  /* We lacked a confirmed connection to the target
6593  before, so tell CORE about it (finally!) */
6595  }
6596  else
6597  {
6598  /* Link was already up, remember dv is also now available and we are done */
6599  vl->dv = dv;
6600  dv->vl = vl;
6601  if (GNUNET_NO == vl->confirmed)
6602  {
6603  vl->confirmed = GNUNET_YES;
6604  vl->visibility_task =
6606  consider_sending_fc (vl);
6607  /* We lacked a confirmed connection to the target
6608  before, so tell CORE about it (finally!) */
6610  }
6611  else
6613  "Virtual link to %s could now also use DV!\n",
6614  GNUNET_i2s (&dv->target));
6615  }
6616 }
6617 
6618 
6644 static int
6646  unsigned int path_len,
6647  struct GNUNET_TIME_Relative network_latency,
6648  struct GNUNET_TIME_Absolute path_valid_until)
6649 {
6650  struct DistanceVectorHop *hop;
6651  struct DistanceVector *dv;
6652  struct Neighbour *next_hop;
6653  unsigned int shorter_distance;
6654 
6655  if (path_len < 3)
6656  {
6657  /* what a boring path! not allowed! */
6658  GNUNET_break (0);
6659  return GNUNET_SYSERR;
6660  }
6661  GNUNET_assert (0 == GNUNET_memcmp (&GST_my_identity, &path[0]));
6662  next_hop = lookup_neighbour (&path[1]);
6663  if (NULL == next_hop)
6664  {
6665  /* next hop must be a neighbour, otherwise this whole thing is useless! */
6666  GNUNET_break (0);
6667  return GNUNET_SYSERR;
6668  }
6669  for (unsigned int i = 2; i < path_len; i++)
6670  if (NULL != lookup_neighbour (&path[i]))
6671  {
6672  /* Useless path: we have a direct connection to some hop
6673  in the middle of the path, so this one is not even
6674  terribly useful for redundancy */
6676  "Path of %u hops useless: directly link to hop %u (%s)\n",
6677  path_len,
6678  i,
6679  GNUNET_i2s (&path[i]));
6681  "# Useless DV path ignored: hop is neighbour",
6682  1,
6683  GNUNET_NO);
6684  return GNUNET_SYSERR;
6685  }
6686  dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &path[path_len - 1]);
6687  if (NULL == dv)
6688  {
6689  dv = GNUNET_new (struct DistanceVector);
6690  dv->target = path[path_len - 1];
6692  &path_cleanup_cb,
6693  dv);
6696  dv_routes,
6697  &dv->target,
6698  dv,
6700  }
6701  /* Check if we have this path already! */
6702  shorter_distance = 0;
6703  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
6704  pos = pos->next_dv)
6705  {
6706  if (pos->distance < path_len - 3)
6707  shorter_distance++;
6708  /* Note that the distances in 'pos' excludes us (path[0]),
6709  the next_hop (path[1]) and the target so we need to subtract three
6710  and check next_hop explicitly */
6711  if ((pos->distance == path_len - 3) && (pos->next_hop == next_hop))
6712  {
6713  int match = GNUNET_YES;
6714 
6715  for (unsigned int i = 0; i < pos->distance; i++)
6716  {
6717  if (0 != GNUNET_memcmp (&pos->path[i], &path[i + 2]))
6718  {
6719  match = GNUNET_NO;
6720  break;
6721  }
6722  }
6723  if (GNUNET_YES == match)
6724  {
6725  struct GNUNET_TIME_Relative last_timeout;
6726 
6727  /* Re-discovered known path, update timeout */
6729  "# Known DV path refreshed",
6730  1,
6731  GNUNET_NO);
6732  last_timeout = GNUNET_TIME_absolute_get_remaining (pos->timeout);
6733  pos->timeout =
6735  pos->path_valid_until =
6736  GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until);
6737  GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos);
6738  GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos);
6739  if (0 <
6742  if (last_timeout.rel_value_us <
6745  .rel_value_us)
6746  {
6747  /* Some peer send DV learn messages too often, we are learning
6748  the same path faster than it would be useful; do not forward! */
6750  "Rediscovered path too quickly, not forwarding further\n");
6751  return GNUNET_NO;
6752  }
6754  "Refreshed known path to %s valid until %s, forwarding further\n",
6755  GNUNET_i2s (&dv->target),
6757  pos->path_valid_until));
6758  return GNUNET_YES;
6759  }
6760  }
6761  }
6762  /* Count how many shorter paths we have (incl. direct
6763  neighbours) before simply giving up on this one! */
6764  if (shorter_distance >= MAX_DV_PATHS_TO_TARGET)
6765  {
6766  /* We have a shorter path already! */
6768  "Have many shorter DV paths %s, not forwarding further\n",
6769  GNUNET_i2s (&dv->target));
6770  return GNUNET_NO;
6771  }
6772  /* create new DV path entry */
6774  "Discovered new DV path to %s valid until %s\n",
6775  GNUNET_i2s (&dv->target),
6776  GNUNET_STRINGS_absolute_time_to_string (path_valid_until));
6777  hop = GNUNET_malloc (sizeof(struct DistanceVectorHop)
6778  + sizeof(struct GNUNET_PeerIdentity) * (path_len - 3));
6779  hop->next_hop = next_hop;
6780  hop->dv = dv;
6781  hop->path = (const struct GNUNET_PeerIdentity *) &hop[1];
6782  memcpy (&hop[1],
6783  &path[2],
6784  sizeof(struct GNUNET_PeerIdentity) * (path_len - 3));
6786  hop->path_valid_until = path_valid_until;
6787  hop->distance = path_len - 3;
6788  hop->pd.aged_rtt = network_latency;
6789  GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, hop);
6790  GNUNET_CONTAINER_MDLL_insert (neighbour,
6791  next_hop->dv_head,
6792  next_hop->dv_tail,
6793  hop);
6794  if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
6796  return GNUNET_YES;
6797 }
6798 
6799 
6807 static int
6808 check_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
6809 {
6810  uint16_t size = ntohs (dvl->header.size);
6811  uint16_t num_hops = ntohs (dvl->num_hops);
6812  const struct DVPathEntryP *hops = (const struct DVPathEntryP *) &dvl[1];
6813 
6814  (void) cls;
6815  if (size != sizeof(*dvl) + num_hops * sizeof(struct DVPathEntryP))
6816  {
6817  GNUNET_break_op (0);
6818  return GNUNET_SYSERR;
6819  }
6820  if (num_hops > MAX_DV_HOPS_ALLOWED)
6821  {
6822  GNUNET_break_op (0);
6823  return GNUNET_SYSERR;
6824  }
6825  for (unsigned int i = 0; i < num_hops; i++)
6826  {
6827  if (0 == GNUNET_memcmp (&dvl->initiator, &hops[i].hop))
6828  {
6829  GNUNET_break_op (0);
6830  return GNUNET_SYSERR;
6831  }
6832  if (0 == GNUNET_memcmp (&GST_my_identity, &hops[i].hop))
6833  {
6834  GNUNET_break_op (0);
6835  return GNUNET_SYSERR;
6836  }
6837  }
6838  return GNUNET_YES;
6839 }
6840 
6841 
6853 static void
6854 forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
6855  const struct TransportDVLearnMessage *msg,
6856  uint16_t bi_history,
6857  uint16_t nhops,
6858  const struct DVPathEntryP *hops,
6859  struct GNUNET_TIME_Absolute in_time)
6860 {
6861  struct Neighbour *n;
6862  struct VirtualLink *vl;
6863  struct DVPathEntryP *dhops;
6864  char buf[sizeof(struct TransportDVLearnMessage)
6865  + (nhops + 1) * sizeof(struct DVPathEntryP)] GNUNET_ALIGN;
6866  struct TransportDVLearnMessage *fwd = (struct TransportDVLearnMessage *) buf;
6867  struct GNUNET_TIME_Relative nnd;
6868 
6869  /* compute message for forwarding */
6871  "Forwarding DV learn message originating from %s to %s\n",
6872  GNUNET_i2s (&msg->initiator),
6873  GNUNET_i2s2 (next_hop));
6876  fwd->header.size = htons (sizeof(struct TransportDVLearnMessage)
6877  + (nhops + 1) * sizeof(struct DVPathEntryP));
6878  fwd->num_hops = htons (nhops + 1);
6879  fwd->bidirectional = htons (bi_history);
6882  msg->non_network_delay));
6884  fwd->init_sig = msg->init_sig;
6885  fwd->initiator = msg->initiator;
6886  fwd->challenge = msg->challenge;
6887  fwd->monotonic_time = msg->monotonic_time;
6888  dhops = (struct DVPathEntryP *) &fwd[1];
6889  GNUNET_memcpy (dhops, hops, sizeof(struct DVPathEntryP) * nhops);
6890  dhops[nhops].hop = GST_my_identity;
6891  {
6892  struct DvHopPS dhp = {
6894  .purpose.size = htonl (sizeof(dhp)),
6895  .pred = (0 == nhops) ? msg->initiator : dhops[nhops - 1].hop,
6896  .succ = *next_hop,
6897  .challenge = msg->challenge
6898  };
6900  &dhp,
6901  &dhops[nhops].hop_sig);
6902  }
6903  /*route_control_message_without_fc (next_hop,
6904  &fwd->header,
6905  RMO_UNCONFIRMED_ALLOWED);*/
6906  vl = lookup_virtual_link (next_hop);
6907  if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
6908  {
6910  &fwd->header,
6912  }
6913  else
6914  {
6915  /* Use route via neighbour */
6916  n = lookup_neighbour (next_hop);
6917  if (NULL != n)
6919  n,
6920  &fwd->header,
6922  }
6923 }
6924 
6925 
6935 static int
6937  struct GNUNET_TIME_AbsoluteNBO sender_monotonic_time,
6938  const struct GNUNET_PeerIdentity *init,
6940  const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
6941 {
6942  struct DvInitPS ip = { .purpose.purpose = htonl (
6944  .purpose.size = htonl (sizeof(ip)),
6945  .monotonic_time = sender_monotonic_time,
6946  .challenge = *challenge };
6947 
6948  if (
6949  GNUNET_OK !=
6951  &ip,
6952  init_sig,
6953  &init->public_key))
6954  {
6955  GNUNET_break_op (0);
6956  return GNUNET_SYSERR;
6957  }
6958  return GNUNET_OK;
6959 }
6960 
6961 
6966 {
6971 
6975  const struct DVPathEntryP *hops;
6976 
6981 
6986 
6990  unsigned int num_eligible;
6991 
6995  unsigned int num_selections;
6996 
7000  uint16_t nhops;
7001 
7005  uint16_t bi_history;
7006 };
7007 
7008 
7017 static int
7019  const struct GNUNET_PeerIdentity *pid,
7020  void *value)
7021 {
7022  struct NeighbourSelectionContext *nsc = cls;
7023 
7024  (void) value;
7025  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
7026  return GNUNET_YES; /* skip initiator */
7027  for (unsigned int i = 0; i < nsc->nhops; i++)
7028  if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
7029  return GNUNET_YES;
7030  /* skip peers on path */
7031  nsc->num_eligible++;
7032  return GNUNET_YES;
7033 }
7034 
7035 
7046 static int
7048  const struct GNUNET_PeerIdentity *pid,
7049  void *value)
7050 {
7051  struct NeighbourSelectionContext *nsc = cls;
7052 
7054  "transmission %s\n",
7055  GNUNET_i2s (pid));
7056  (void) value;
7057  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
7058  return GNUNET_YES; /* skip initiator */
7059  for (unsigned int i = 0; i < nsc->nhops; i++)
7060  if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
7061  return GNUNET_YES;
7062  /* skip peers on path */
7063  for (unsigned int i = 0; i < nsc->num_selections; i++)
7064  {
7065  if (nsc->selections[i] == nsc->num_eligible)
7066  {
7068  nsc->dvl,
7069  nsc->bi_history,
7070  nsc->nhops,
7071  nsc->hops,
7072  nsc->in_time);
7073  break;
7074  }
7075  }
7076  nsc->num_eligible++;
7077  return GNUNET_YES;
7078 }
7079 
7080 
7124 static unsigned int
7125 calculate_fork_degree (unsigned int hops_taken,
7126  unsigned int neighbour_count,
7127  unsigned int eligible_count)
7128 {
7129  double target_total = 50.0; /* FIXME: use LOG(NSE)? */
7130  double eligible_ratio =
7131  ((double) eligible_count) / ((double) neighbour_count);
7132  double boost_factor = eligible_ratio * eligible_ratio;
7133  unsigned int rnd;
7134  double left;
7135 
7136  if (hops_taken >= 64)
7137  {
7138  GNUNET_break (0);
7139  return 0; /* precaution given bitshift below */
7140  }
7141  for (unsigned int i = 1; i < hops_taken; i++)
7142  {
7143  /* For each hop, subtract the expected number of targets
7144  reached at distance d (so what remains divided by 2^d) */
7145  target_total -= (target_total * boost_factor / (1LLU << i));
7146  }
7147  rnd =
7148  (unsigned int) floor (target_total * boost_factor / (1LLU << hops_taken));
7149  /* round up or down probabilistically depending on how close we were
7150  when floor()ing to rnd */
7151  left = target_total - (double) rnd;
7152  if (UINT32_MAX * left >
7154  rnd++; /* round up */
7156  "Forwarding DV learn message of %u hops %u(/%u/%u) times\n",
7157  hops_taken,
7158  rnd,
7159  eligible_count,
7160  neighbour_count);
7161  return rnd;
7162 }
7163 
7164 
7171 static void
7172 neighbour_store_dvmono_cb (void *cls, int success)
7173 {
7174  struct Neighbour *n = cls;
7175 
7176  n->sc = NULL;
7177  if (GNUNET_YES != success)
7179  "Failed to store other peer's monotonic time in peerstore!\n");
7180 }
7181 
7182 
7190 static void
7191 handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
7192 {
7193  struct CommunicatorMessageContext *cmc = cls;
7195  int bi_hop;
7196  uint16_t nhops;
7197  uint16_t bi_history;
7198  const struct DVPathEntryP *hops;
7199  int do_fwd;
7200  int did_initiator;
7201  struct GNUNET_TIME_Absolute in_time;
7202  struct Neighbour *n;
7203 
7204  nhops = ntohs (dvl->num_hops); /* 0 = sender is initiator */
7205  bi_history = ntohs (dvl->bidirectional);
7206  hops = (const struct DVPathEntryP *) &dvl[1];
7207  if (0 == nhops)
7208  {
7209  /* sanity check */
7210  if (0 != GNUNET_memcmp (&dvl->initiator, &cmc->im.sender))
7211  {
7212  GNUNET_break (0);
7213  finish_cmc_handling (cmc);
7214  return;
7215  }
7216  }
7217  else
7218  {
7220  "handle dv learn message last hop %s\n",
7221  GNUNET_i2s (&hops[nhops - 1].hop));
7222  /* sanity check */
7223  if (0 != GNUNET_memcmp (&hops[nhops - 1].hop, &cmc->im.sender))
7224  {
7225  GNUNET_break (0);
7226  finish_cmc_handling (cmc);
7227  return;
7228  }
7229  }
7230 
7231  GNUNET_assert (CT_COMMUNICATOR == cmc->tc->type);
7232  cc = cmc->tc->details.communicator.cc;
7233  bi_hop = (GNUNET_TRANSPORT_CC_RELIABLE ==
7234  cc); // FIXME: add bi-directional flag to cc?
7235  in_time = GNUNET_TIME_absolute_get ();
7236 
7237  /* continue communicator here, everything else can happen asynchronous! */
7238  finish_cmc_handling (cmc);
7239 
7240  n = lookup_neighbour (&dvl->initiator);
7241  if (NULL != n)
7242  {
7243  if ((n->dv_monotime_available == GNUNET_YES) &&
7246  {
7248  "DV learn from %s discarded due to time travel",
7249  GNUNET_i2s (&dvl->initiator));
7251  "# DV learn discarded due to time travel",
7252  1,
7253  GNUNET_NO);
7254  return;
7255  }
7257  &dvl->initiator,
7258  &dvl->challenge,
7259  &dvl->init_sig))
7260  {
7262  "DV learn signature from %s invalid\n",
7263  GNUNET_i2s (&dvl->initiator));
7264  GNUNET_break_op (0);
7265  return;
7266  }
7269  {
7270  if (NULL != n->sc)
7271  {
7273  "store cancel\n");
7275  }
7276  n->sc =
7278  "transport",
7279  &dvl->initiator,
7281  &dvl->monotonic_time,
7282  sizeof(dvl->monotonic_time),
7286  n);
7287  }
7288  }
7289  /* OPTIMIZE-FIXME: asynchronously (!) verify signatures!,
7290  If signature verification load too high, implement random drop strategy */
7291  for (unsigned int i = 0; i < nhops; i++)
7292  {
7293  struct DvHopPS dhp = { .purpose.purpose =
7295  .purpose.size = htonl (sizeof(dhp)),
7296  .pred = (0 == i) ? dvl->initiator : hops[i - 1].hop,
7297  .succ = (nhops == i + 1) ? GST_my_identity
7298  : hops[i + 1].hop,
7299  .challenge = dvl->challenge };
7300 
7301  if (GNUNET_OK !=
7303  &dhp,
7304  &hops[i].hop_sig,
7305  &hops[i].hop.public_key))
7306  {
7308  "DV learn from %s signature of hop %u invalid\n",
7309  GNUNET_i2s (&dvl->initiator),
7310  i);
7312  "signature of hop %s invalid\n",
7313  GNUNET_i2s (&hops[i].hop));
7315  "pred %s\n",
7316  GNUNET_i2s (&dhp.pred));
7318  "succ %s\n",
7319  GNUNET_i2s (&dhp.succ));
7321  "hash %s\n",
7322  GNUNET_sh2s (&dhp.challenge.value));
7323  GNUNET_break_op (0);
7324  return;
7325  }
7326  }
7327  if (GNUNET_EXTRA_LOGGING > 0)
7328  {
7329  char *path;
7330 
7331  path = GNUNET_strdup (GNUNET_i2s (&dvl->initiator));
7332  for (unsigned int i = 0; i < nhops; i++)
7333  {
7334  char *tmp;
7335 
7336  GNUNET_asprintf (&tmp,
7337  "%s%s%s",
7338  path,
7339  (bi_history & (1 << (nhops - i))) ? "<->" : "-->",
7340  GNUNET_i2s (&hops[i].hop));
7341  GNUNET_free (path);
7342  path = tmp;
7343  }
7345  "Received DVInit via %s%s%s\n",
7346  path,
7347  bi_hop ? "<->" : "-->",
7349  GNUNET_free (path);
7350  }
7351  do_fwd = GNUNET_YES;
7352  if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator))
7353  {
7354  struct GNUNET_PeerIdentity path[nhops + 1];
7355  struct GNUNET_TIME_Relative host_latency_sum;
7356  struct GNUNET_TIME_Relative latency;
7357  struct GNUNET_TIME_Relative network_latency;
7358  struct GNUNET_TIME_Absolute now;
7359 
7360  /* We initiated this, learn the forward path! */
7361  path[0] = GST_my_identity;
7362  path[1] = hops[0].hop;
7363  host_latency_sum = GNUNET_TIME_relative_ntoh (dvl->non_network_delay);
7364 
7365  // Need also something to lookup initiation time
7366  // to compute RTT! -> add RTT argument here?
7367  now = GNUNET_TIME_absolute_get ();
7369  dvl->monotonic_time));
7370  GNUNET_assert (latency.rel_value_us >= host_latency_sum.rel_value_us);
7371  // latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
7372  // (based on dvl->challenge, we can identify time of origin!)
7373 
7374  network_latency = GNUNET_TIME_relative_subtract (latency, host_latency_sum);
7375  /* assumption: latency on all links is the same */
7376  network_latency = GNUNET_TIME_relative_divide (network_latency, nhops);
7377 
7378  for (unsigned int i = 2; i <= nhops; i++)
7379  {
7380  struct GNUNET_TIME_Relative ilat;
7381 
7382  /* assumption: linear latency increase per hop */
7383  ilat = GNUNET_TIME_relative_multiply (network_latency, i);
7384  path[i] = hops[i - 1].hop;
7386  "Learned path with %u hops to %s with latency %s\n",
7387  i,
7388  GNUNET_i2s (&path[i]),
7390  learn_dv_path (path,
7391  i + 1,
7392  ilat,
7395  }
7396  /* as we initiated, do not forward again (would be circular!) */
7397  do_fwd = GNUNET_NO;
7398  return;
7399  }
7400  if (bi_hop)
7401  {
7402  /* last hop was bi-directional, we could learn something here! */
7403  struct GNUNET_PeerIdentity path[nhops + 2];
7404 
7405  path[0] = GST_my_identity;
7406  path[1] = hops[nhops - 1].hop; /* direct neighbour == predecessor! */
7407  for (unsigned int i = 0; i < nhops; i++)
7408  {
7409  int iret;
7410 
7411  if (0 == (bi_history & (1 << i)))
7412  break; /* i-th hop not bi-directional, stop learning! */
7413  if (i == nhops - 1)
7414  {
7415  path[i + 2] = dvl->initiator;
7416  }
7417  else
7418  {
7419  path[i + 2] = hops[nhops - i - 2].hop;
7420  }
7421 
7423  "Learned inverse path with %u hops to %s\n",
7424  i + 2,
7425  GNUNET_i2s (&path[i + 2]));
7426  iret = learn_dv_path (path,
7427  i + 3,
7430  if (GNUNET_SYSERR == iret)
7431  {
7432  /* path invalid or too long to be interesting for US, thus should also
7433  not be interesting to our neighbours, cut path when forwarding to
7434  'i' hops, except of course for the one that goes back to the
7435  initiator */
7437  "# DV learn not forwarded due invalidity of path",
7438  1,
7439  GNUNET_NO);
7440  do_fwd = GNUNET_NO;
7441  break;
7442  }
7443  if ((GNUNET_NO == iret) && (nhops == i + 1))
7444  {
7445  /* we have better paths, and this is the longest target,
7446  so there cannot be anything interesting later */
7448  "# DV learn not forwarded, got better paths",
7449  1,
7450  GNUNET_NO);
7451  do_fwd = GNUNET_NO;
7452  break;
7453  }
7454  }
7455  }
7456  if (MAX_DV_HOPS_ALLOWED == nhops)
7457  {
7458  /* At limit, we're out of here! */
7459  return;
7460  }
7461 
7462  /* Forward to initiator, if path non-trivial and possible */
7463  bi_history = (bi_history << 1) | (bi_hop ? 1 : 0);
7464  did_initiator = GNUNET_NO;
7465  if ((1 <= nhops) &&
7466  (GNUNET_YES ==
7468  {
7469  /* send back to origin! */
7471  "Sending DVL bac