GNUnet  0.19.3
gnunet-service-tng.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
75 #include "platform.h"
76 #include "gnunet_util_lib.h"
80 #include "gnunet_hello_lib.h"
81 #include "gnunet_signatures.h"
82 #include "transport.h"
83 
87 #define RING_BUFFER_SIZE 16
88 
92 #define MAX_FC_RETRANSMIT_COUNT 1000
93 
98 #define MAX_CUMMULATIVE_ACKS 64
99 
112 #define FC_NO_CHANGE_REPLY_PROBABILITY 8
113 
118 #define IN_PACKET_SIZE_WITHOUT_MTU 128
119 
124 #define GOODPUT_AGING_SLOTS 4
125 
130 #define DEFAULT_WINDOW_SIZE (128 * 1024)
131 
140 #define MAX_INCOMING_REQUEST 16
141 
146 #define MAX_DV_DISCOVERY_SELECTION 16
147 
156 #define RECV_WINDOW_SIZE 4
157 
165 #define MIN_DV_PATH_LENGTH_FOR_INITIATOR 3
166 
170 #define MAX_DV_HOPS_ALLOWED 16
171 
176 #define MAX_DV_LEARN_PENDING 64
177 
181 #define MAX_DV_PATHS_TO_TARGET 3
182 
188 #define DELAY_WARN_THRESHOLD \
189  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
190 
195 #define DV_FORWARD_TIMEOUT \
196  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
197 
201 #define DEFAULT_ACK_WAIT_DURATION \
202  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
203 
209 #define DV_QUALITY_RTT_THRESHOLD \
210  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
211 
216 #define DV_PATH_VALIDITY_TIMEOUT \
217  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
218 
223 #define BACKCHANNEL_INACTIVITY_TIMEOUT \
224  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
225 
230 #define DV_PATH_DISCOVERY_FREQUENCY \
231  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
232 
236 #define EPHEMERAL_VALIDITY \
237  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
238 
242 #define REASSEMBLY_EXPIRATION \
243  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
244 
249 #define FAST_VALIDATION_CHALLENGE_FREQ \
250  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
251 
255 #define MAX_VALIDATION_CHALLENGE_FREQ \
256  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
257 
263 #define ACK_CUMMULATOR_TIMEOUT \
264  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
265 
270 #define DV_LEARN_BASE_FREQUENCY GNUNET_TIME_UNIT_MINUTES
271 
276 #define DV_LEARN_QUALITY_THRESHOLD 100
277 
281 #define MAX_ADDRESS_VALID_UNTIL \
282  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MONTHS, 1)
283 
287 #define ADDRESS_VALIDATION_LIFETIME \
288  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
289 
296 #define MIN_DELAY_ADDRESS_VALIDATION GNUNET_TIME_UNIT_MILLISECONDS
297 
304 #define VALIDATION_RTT_BUFFER_FACTOR 3
305 
312 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
313 
319 #define QUEUE_LENGTH_LIMIT 32
320 
321 
323 
328 {
333  uint64_t uuid GNUNET_PACKED;
334 };
335 
336 
341 {
345  struct GNUNET_Uuid value;
346 };
347 
352 {
357 
358  /* Followed by *another* message header which is the message to
359  the communicator */
360 
361  /* Followed by a 0-terminated name of the communicator */
362 };
363 
364 
369 {
374 
390 
395 
401 };
402 
403 
409 {
414 
420 
432 
433  /* Followed by a `struct GNUNET_MessageHeader` with a message
434  for the target peer */
435 };
436 
437 
443 {
448 
456 
463 };
464 
465 
470 {
478 
483 };
484 
485 
494 {
499 
505 
506  /* followed by any number of `struct TransportCummulativeAckPayloadP`
507  messages providing ACKs */
508 };
509 
510 
515 {
520 
525 
530 
539 
544  struct MessageUUIDP msg_uuid;
545 };
546 
547 
565 struct DvInitPS
566 {
571 
585 
590 };
591 
592 
609 struct DvHopPS
610 {
615 
619  struct GNUNET_PeerIdentity pred;
620 
624  struct GNUNET_PeerIdentity succ;
625 
630 };
631 
632 
638 {
642  struct GNUNET_PeerIdentity hop;
643 
649 };
650 
651 
666 {
671 
677 
687 
694 
708 
714 
719 
724 
725  /* Followed by @e num_hops `struct DVPathEntryP` values,
726  excluding the initiator of the DV trace; the last entry is the
727  current sender; the current peer must not be included. */
728 };
729 
730 
754 {
759 
763  unsigned int without_fc;
764 
772 
779 
785 
791  struct GNUNET_ShortHashCode iv;
792 
798  struct GNUNET_HashCode hmac;
799 
806 
807  /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
808  excluding the @e origin and the current peer, the last must be
809  the ultimate target; if @e num_hops is zero, the receiver of this
810  message is the ultimate target. */
811 
812  /* Followed by encrypted, variable-size payload, which
813  must begin with a `struct TransportDVBoxPayloadP` */
814 
815  /* Followed by the actual message, which itself must not be a
816  a DV_LEARN or DV_BOX message! */
817 };
818 
819 
825 {
830 
835 
840 
846 };
847 
848 
854 {
859 
865 
870 };
871 
872 
878 {
883 
888 
894 
899 
904  struct GNUNET_TIME_AbsoluteNBO origin_time;
905 
910  struct GNUNET_TIME_RelativeNBO validity_duration;
911 };
912 
913 
923 {
928 
936  uint32_t seq GNUNET_PACKED;
937 
943 
950 
960 
970 };
971 
972 
974 
975 
980 {
984  CT_NONE = 0,
985 
989  CT_CORE = 1,
990 
995 
1000 
1004  CT_APPLICATION = 4
1005 };
1006 
1007 
1013 {
1018 
1023 
1028 
1033 
1039  RMO_REDUNDANT = 4
1040 };
1041 
1042 
1047 {
1052 
1057 
1062 
1068 };
1069 
1070 
1076 {
1080  uint64_t bytes_sent;
1081 
1086  uint64_t bytes_received;
1087 };
1088 
1089 
1094 {
1099 
1105 
1110  unsigned int last_age;
1111 };
1112 
1113 
1117 struct TransportClient;
1118 
1122 struct Neighbour;
1123 
1128 struct DistanceVector;
1129 
1134 struct Queue;
1135 
1139 struct PendingMessage;
1140 
1144 struct DistanceVectorHop;
1145 
1154 struct VirtualLink;
1155 
1156 
1162 {
1168 
1174 
1179 
1183  struct GNUNET_TRANSPORT_IncomingMessage im;
1184 
1188  const struct GNUNET_MessageHeader *mh;
1189 
1194  uint16_t total_hops;
1195 };
1196 
1197 
1202 {
1207 
1212 };
1213 
1214 
1219 {
1224 
1229 
1233  struct VirtualLink *vl;
1234 
1238  uint16_t size;
1239 
1246  uint16_t isize;
1247 };
1248 
1249 
1254 {
1259  struct MessageUUIDP msg_uuid;
1260 
1265 
1270 
1278  uint8_t *bitfield;
1279 
1284 
1290 
1294  uint16_t msg_size;
1295 
1300  uint16_t msg_missing;
1301 
1302  /* Followed by @e msg_size bytes of the (partially) defragmented original
1303  * message */
1304 
1305  /* Followed by @e bitfield data */
1306 };
1307 
1308 
1318 {
1322  struct GNUNET_PeerIdentity target;
1323 
1330 
1337 
1342 
1348 
1354 
1359 
1364 
1369 
1374 
1382 
1388 
1392  unsigned int fc_retransmit_count;
1393 
1398  unsigned int confirmed;
1399 
1403  struct Neighbour *n;
1404 
1409 
1416 
1423 
1432 
1438 
1444 
1453 
1461 
1468 
1477 
1490 
1496 
1503 
1514 
1519  uint32_t fc_seq_gen;
1520 
1526  uint32_t last_fc_seq;
1527 
1540 };
1541 
1542 
1547 {
1553 
1559 
1566 
1573 
1580 
1587 
1594 
1601 
1606 
1612 
1618 
1623  struct Queue *queue;
1624 
1629 
1633  uint16_t message_size;
1634 };
1635 
1636 
1641 {
1646 
1651 
1656 
1661 
1666 
1671 
1676 
1681 
1687  const struct GNUNET_PeerIdentity *path;
1688 
1694 
1703 
1707  struct PerformanceData pd;
1708 
1714  unsigned int distance;
1715 };
1716 
1717 
1723 {
1727  struct GNUNET_PeerIdentity target;
1728 
1733 
1738 
1743 
1748  struct VirtualLink *vl;
1749 
1755 
1760 
1765 
1770 
1775 };
1776 
1777 
1787 struct QueueEntry
1788 {
1792  struct QueueEntry *next;
1793 
1797  struct QueueEntry *prev;
1798 
1802  struct Queue *queue;
1803 
1808 
1812  uint64_t mid;
1813 };
1814 
1815 
1820 struct Queue
1821 {
1826 
1831 
1836 
1841 
1846 
1851 
1856 
1861 
1866 
1871 
1875  const char *address;
1876 
1880  unsigned int unlimited_length;
1881 
1887 
1896 
1900  struct PerformanceData pd;
1901 
1906  uint64_t mid_gen;
1907 
1911  uint32_t qid;
1912 
1916  uint32_t mtu;
1917 
1922 
1927 
1931  unsigned int queue_length;
1932 
1936  uint64_t q_capacity;
1937 
1941  uint32_t priority;
1942 
1946  enum GNUNET_NetworkType nt;
1947 
1952 
1957  int idle;
1958 };
1959 
1960 
1964 struct Neighbour
1965 {
1969  struct GNUNET_PeerIdentity pid;
1970 
1976 
1982 
1987 
1992 
1998 
2004 
2009  struct VirtualLink *vl;
2010 
2016 
2022 };
2023 
2024 
2030 {
2035 
2040 
2045 
2049  struct GNUNET_PeerIdentity pid;
2050 };
2051 
2052 
2056 struct PeerRequest
2057 {
2061  struct GNUNET_PeerIdentity pid;
2062 
2067 
2072 
2079 
2084 };
2085 
2086 
2091 {
2096 
2101 
2106 
2110  PMT_DV_BOX = 3
2111 };
2112 
2113 
2140 struct PendingMessage
2141 {
2146 
2151 
2156 
2161 
2167 
2173 
2178 
2183 
2189 
2194  struct VirtualLink *vl;
2195 
2199  struct GNUNET_PeerIdentity target;
2200 
2209  struct QueueEntry *qe;
2210 
2215 
2220 
2225 
2230 
2235 
2240 
2245  struct MessageUUIDP msg_uuid;
2246 
2251  unsigned long long logging_uuid;
2252 
2256  enum PendingMessageType pmt;
2257 
2263 
2268 
2272  uint16_t bytes_msg;
2273 
2277  uint16_t frag_off;
2278 
2282  int16_t msg_uuid_set;
2283 
2284  /* Followed by @e bytes_msg to transmit */
2285 };
2286 
2287 
2292 {
2298 
2303 };
2304 
2305 
2311 {
2315  struct GNUNET_PeerIdentity target;
2316 
2321 
2328 
2333 
2339  uint32_t ack_counter;
2340 
2344  unsigned int num_acks;
2345 };
2346 
2347 
2352 {
2357 
2362 
2367 
2371  const char *address;
2372 
2377 
2382 
2388 
2392  uint32_t aid;
2393 
2397  enum GNUNET_NetworkType nt;
2398 };
2399 
2400 
2405 {
2410 
2415 
2420 
2425 
2429  enum ClientType type;
2430 
2431  union
2432  {
2436  struct
2437  {
2443 
2448  } core;
2449 
2453  struct
2454  {
2460  struct GNUNET_PeerIdentity peer;
2461 
2467 
2468 
2472  struct
2473  {
2479 
2484 
2489 
2495 
2501 
2507  unsigned int total_queue_length;
2508 
2514 
2518  struct
2519  {
2527 };
2528 
2529 
2535 {
2540  struct GNUNET_PeerIdentity pid;
2541 
2549 
2555 
2562  struct GNUNET_TIME_Absolute first_challenge_use;
2563 
2570  struct GNUNET_TIME_Absolute last_challenge_use;
2571 
2579  struct GNUNET_TIME_Absolute next_challenge;
2580 
2589  struct GNUNET_TIME_Relative challenge_backoff;
2590 
2595  struct GNUNET_TIME_Relative validation_rtt;
2596 
2604  struct GNUNET_CRYPTO_ChallengeNonceP challenge;
2605 
2609  char *address;
2610 
2616  struct GNUNET_CONTAINER_HeapNode *hn;
2617 
2623 
2629  uint32_t last_window_consum_limit;
2630 
2635  int awaiting_queue;
2636 };
2637 
2638 
2646 {
2650  struct GNUNET_PeerIdentity pid;
2651 
2656 
2661 
2666 
2672 
2677 
2683 
2689 
2694  size_t body_size;
2695 };
2696 
2697 
2702 
2706 static unsigned int ring_buffer_head;
2707 
2711 static unsigned int is_ring_buffer_full;
2712 
2717 
2721 static unsigned int ring_buffer_dv_head;
2722 
2726 static unsigned int is_ring_buffer_dv_full;
2727 
2732 
2737 
2742 
2747 
2751 static struct GNUNET_PeerIdentity GST_my_identity;
2752 
2757 
2763 
2769 
2775 
2781 
2787 
2793 
2799 
2804 
2808 static struct LearnLaunchEntry *lle_head = NULL;
2809 
2813 static struct LearnLaunchEntry *lle_tail = NULL;
2814 
2821 
2826 
2831 
2836 
2842 static struct IncomingRequest *ir_head;
2843 
2847 static struct IncomingRequest *ir_tail;
2848 
2852 static unsigned int ir_total;
2853 
2857 static unsigned long long logging_uuid_gen;
2858 
2868 
2873 static int in_shutdown;
2874 
2885 static unsigned int
2887 {
2888  struct GNUNET_TIME_Absolute now;
2889 
2890  now = GNUNET_TIME_absolute_get ();
2891  return now.abs_value_us / GNUNET_TIME_UNIT_MINUTES.rel_value_us / 15;
2892 }
2893 
2894 
2900 static void
2902 {
2904  GNUNET_assert (ir_total > 0);
2905  ir_total--;
2907  ir->wc = NULL;
2908  GNUNET_free (ir);
2909 }
2910 
2911 
2917 static void
2919 {
2920  struct Queue *q = pa->queue;
2921  struct PendingMessage *pm = pa->pm;
2922  struct DistanceVectorHop *dvh = pa->dvh;
2923 
2925  "free_pending_acknowledgement\n");
2926  if (NULL != q)
2927  {
2928  GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
2929  pa->queue = NULL;
2930  }
2931  if (NULL != pm)
2932  {
2934  "remove pa from message\n");
2936  "remove pa from message %llu\n",
2937  pm->logging_uuid);
2939  "remove pa from message %u\n",
2940  pm->pmt);
2942  "remove pa from message %s\n",
2943  GNUNET_uuid2s (&pa->ack_uuid.value));
2944  GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
2945  pa->pm = NULL;
2946  }
2947  if (NULL != dvh)
2948  {
2949  GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
2950  pa->queue = NULL;
2951  }
2954  &pa->ack_uuid.value,
2955  pa));
2956  GNUNET_free (pa);
2957 }
2958 
2959 
2968 static void
2970 {
2971  struct PendingMessage *frag;
2972 
2973  while (NULL != (frag = root->head_frag))
2974  {
2975  struct PendingAcknowledgement *pa;
2976 
2977  free_fragment_tree (frag);
2978  while (NULL != (pa = frag->pa_head))
2979  {
2980  GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
2981  pa->pm = NULL;
2982  }
2983  GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
2984  if (NULL != frag->qe)
2985  {
2986  GNUNET_assert (frag == frag->qe->pm);
2987  frag->qe->pm = NULL;
2989  frag->qe->queue->queue_tail,
2990  frag->qe);
2991  frag->qe->queue->queue_length--;
2993  "Removing QueueEntry MID %lu from queue\n",
2994  frag->qe->mid);
2995  GNUNET_free (frag->qe);
2996  }
2998  "Free frag %p\n",
2999  frag);
3000  GNUNET_free (frag);
3001  }
3002 }
3003 
3004 
3012 static void
3014 {
3015  struct TransportClient *tc = pm->client;
3016  struct VirtualLink *vl = pm->vl;
3017  struct PendingAcknowledgement *pa;
3018 
3020  "Freeing pm %p\n",
3021  pm);
3022  if (NULL != tc)
3023  {
3025  tc->details.core.pending_msg_head,
3026  tc->details.core.pending_msg_tail,
3027  pm);
3028  }
3029  if ((NULL != vl) && (NULL == pm->frag_parent))
3030  {
3032  "Removing pm %llu\n",
3033  pm->logging_uuid);
3035  vl->pending_msg_head,
3036  vl->pending_msg_tail,
3037  pm);
3038  }
3039  while (NULL != (pa = pm->pa_head))
3040  {
3041  if (NULL == pa)
3043  "free pending pa null\n");
3044  if (NULL == pm->pa_tail)
3046  "free pending pa_tail null\n");
3047  if (NULL == pa->prev_pa)
3049  "free pending pa prev null\n");
3050  if (NULL == pa->next_pa)
3052  "free pending pa next null\n");
3053  GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
3054  pa->pm = NULL;
3055  }
3056 
3058  if (NULL != pm->qe)
3059  {
3060  GNUNET_assert (pm == pm->qe->pm);
3061  pm->qe->pm = NULL;
3062  GNUNET_CONTAINER_DLL_remove (pm->qe->queue->queue_head,
3063  pm->qe->queue->queue_tail,
3064  pm->qe);
3065  pm->qe->queue->queue_length--;
3067  "Removing QueueEntry MID %lu from queue\n",
3068  pm->qe->mid);
3069  GNUNET_free (pm->qe);
3071  "QueueEntry MID freed\n");
3072  }
3073  if (NULL != pm->bpm)
3074  {
3075  free_fragment_tree (pm->bpm);
3076  GNUNET_free (pm->bpm);
3077  }
3078 
3079  GNUNET_free (pm);
3081  "Freeing pm done\n");
3082 }
3083 
3084 
3090 static void
3092 {
3093  struct VirtualLink *vl = rc->virtual_link;
3094 
3098  rc->msg_uuid.uuid,
3099  rc));
3100  GNUNET_free (rc);
3101 }
3102 
3103 
3109 static void
3111 {
3112  struct VirtualLink *vl = cls;
3113  struct ReassemblyContext *rc;
3114 
3115  vl->reassembly_timeout_task = NULL;
3116  while (NULL != (rc = GNUNET_CONTAINER_heap_peek (vl->reassembly_heap)))
3117  {
3119  .rel_value_us)
3120  {
3122  continue;
3123  }
3124  GNUNET_assert (NULL == vl->reassembly_timeout_task);
3128  vl);
3129  return;
3130  }
3131 }
3132 
3133 
3142 static int
3143 free_reassembly_cb (void *cls, uint32_t key, void *value)
3144 {
3145  struct ReassemblyContext *rc = value;
3146 
3147  (void) cls;
3148  (void) key;
3150  return GNUNET_OK;
3151 }
3152 
3153 
3159 static void
3161 {
3162  struct PendingMessage *pm;
3163  struct CoreSentContext *csc;
3164 
3166  "free virtual link %p\n",
3167  vl);
3168 
3169  if (NULL != vl->reassembly_map)
3170  {
3173  NULL);
3175  vl->reassembly_map = NULL;
3177  vl->reassembly_heap = NULL;
3178  }
3179  if (NULL != vl->reassembly_timeout_task)
3180  {
3182  vl->reassembly_timeout_task = NULL;
3183  }
3184  while (NULL != (pm = vl->pending_msg_head))
3188  if (NULL != vl->visibility_task)
3189  {
3191  vl->visibility_task = NULL;
3192  }
3193  if (NULL != vl->fc_retransmit_task)
3194  {
3196  vl->fc_retransmit_task = NULL;
3197  }
3198  while (NULL != (csc = vl->csc_head))
3199  {
3201  GNUNET_assert (vl == csc->vl);
3202  csc->vl = NULL;
3203  }
3204  GNUNET_break (NULL == vl->n);
3205  GNUNET_break (NULL == vl->dv);
3206  GNUNET_free (vl);
3207 }
3208 
3209 
3215 static void
3217 {
3218  GNUNET_assert (
3219  GNUNET_YES ==
3222  vs->hn = NULL;
3223  if (NULL != vs->sc)
3224  {
3226  "store cancel\n");
3228  vs->sc = NULL;
3229  }
3230  GNUNET_free (vs->address);
3231  GNUNET_free (vs);
3232 }
3233 
3234 
3241 static struct Neighbour *
3243 {
3245 }
3246 
3247 
3254 static struct VirtualLink *
3256 {
3258 }
3259 
3260 
3265 {
3272 
3276  struct GNUNET_TIME_Relative rtt;
3277 
3282 
3287 
3292 };
3293 
3294 
3303 static void
3305 {
3306  struct Neighbour *n = dvh->next_hop;
3307  struct DistanceVector *dv = dvh->dv;
3308  struct PendingAcknowledgement *pa;
3309 
3310  while (NULL != (pa = dvh->pa_head))
3311  {
3313  pa->dvh = NULL;
3314  }
3315  GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh);
3317  GNUNET_free (dvh);
3318 }
3319 
3320 
3327 static void
3328 check_link_down (void *cls);
3329 
3330 
3336 static void
3338 {
3340  "Informing CORE clients about disconnect from %s\n",
3341  GNUNET_i2s (pid));
3342  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3343  {
3344  struct GNUNET_MQ_Envelope *env;
3345  struct DisconnectInfoMessage *dim;
3346 
3347  if (CT_CORE != tc->type)
3348  continue;
3350  dim->peer = *pid;
3351  GNUNET_MQ_send (tc->mq, env);
3352  }
3353 }
3354 
3355 
3362 static void
3364 {
3365  struct DistanceVectorHop *dvh;
3366 
3367  while (NULL != (dvh = dv->dv_head))
3369  if (NULL == dv->dv_head)
3370  {
3371  struct VirtualLink *vl;
3372 
3373  GNUNET_assert (
3374  GNUNET_YES ==
3376  if (NULL != (vl = dv->vl))
3377  {
3378  GNUNET_assert (dv == vl->dv);
3379  vl->dv = NULL;
3380  if (NULL == vl->n)
3381  {
3383  free_virtual_link (vl);
3384  }
3385  else
3386  {
3389  }
3390  dv->vl = NULL;
3391  }
3392 
3393  if (NULL != dv->timeout_task)
3394  {
3396  dv->timeout_task = NULL;
3397  }
3398  GNUNET_free (dv);
3399  }
3400 }
3401 
3402 
3416 static void
3418  const struct GNUNET_PeerIdentity *peer,
3419  const char *address,
3420  enum GNUNET_NetworkType nt,
3421  const struct MonitorEvent *me)
3422 {
3423  struct GNUNET_MQ_Envelope *env;
3424  struct GNUNET_TRANSPORT_MonitorData *md;
3425  size_t addr_len = strlen (address) + 1;
3426 
3427  env = GNUNET_MQ_msg_extra (md,
3428  addr_len,
3430  md->nt = htonl ((uint32_t) nt);
3431  md->peer = *peer;
3432  md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
3433  md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
3434  md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
3435  md->rtt = GNUNET_TIME_relative_hton (me->rtt);
3436  md->cs = htonl ((uint32_t) me->cs);
3437  md->num_msg_pending = htonl (me->num_msg_pending);
3438  md->num_bytes_pending = htonl (me->num_bytes_pending);
3439  memcpy (&md[1], address, addr_len);
3440  GNUNET_MQ_send (tc->mq, env);
3441 }
3442 
3443 
3453 static void
3455  const char *address,
3456  enum GNUNET_NetworkType nt,
3457  const struct MonitorEvent *me)
3458 {
3459  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3460  {
3461  if (CT_MONITOR != tc->type)
3462  continue;
3463  if (tc->details.monitor.one_shot)
3464  continue;
3465  if ((GNUNET_NO == GNUNET_is_zero (&tc->details.monitor.peer)) &&
3466  (0 != GNUNET_memcmp (&tc->details.monitor.peer, peer)))
3467  continue;
3469  }
3470 }
3471 
3472 
3482 static void *
3484  struct GNUNET_SERVICE_Client *client,
3485  struct GNUNET_MQ_Handle *mq)
3486 {
3487  struct TransportClient *tc;
3488 
3489  (void) cls;
3490  tc = GNUNET_new (struct TransportClient);
3491  tc->client = client;
3492  tc->mq = mq;
3495  "Client %p of type %u connected\n",
3496  tc,
3497  tc->type);
3498  return tc;
3499 }
3500 
3501 
3507 static void
3508 free_neighbour (struct Neighbour *neighbour)
3509 {
3510  struct DistanceVectorHop *dvh;
3511  struct VirtualLink *vl;
3512 
3513  GNUNET_assert (NULL == neighbour->queue_head);
3516  &neighbour->pid,
3517  neighbour));
3519  "Freeing neighbour\n");
3520  while (NULL != (dvh = neighbour->dv_head))
3521  {
3522  struct DistanceVector *dv = dvh->dv;
3523 
3525  if (NULL == dv->dv_head)
3526  free_dv_route (dv);
3527  }
3528  if (NULL != neighbour->get)
3529  {
3530  GNUNET_PEERSTORE_iterate_cancel (neighbour->get);
3531  neighbour->get = NULL;
3532  }
3533  if (NULL != neighbour->sc)
3534  {
3536  "store cancel\n");
3537  GNUNET_PEERSTORE_store_cancel (neighbour->sc);
3538  neighbour->sc = NULL;
3539  }
3540  if (NULL != (vl = neighbour->vl))
3541  {
3542  GNUNET_assert (neighbour == vl->n);
3543  vl->n = NULL;
3544  if (NULL == vl->dv)
3545  {
3548  }
3549  else
3550  {
3553  }
3554  neighbour->vl = NULL;
3555  }
3556  GNUNET_free (neighbour);
3557 }
3558 
3559 
3566 static void
3568  const struct GNUNET_PeerIdentity *pid)
3569 {
3570  struct GNUNET_MQ_Envelope *env;
3571  struct ConnectInfoMessage *cim;
3572 
3573  GNUNET_assert (CT_CORE == tc->type);
3575  cim->id = *pid;
3576  GNUNET_MQ_send (tc->mq, env);
3577 }
3578 
3579 
3585 static void
3587 {
3589  "Informing CORE clients about connection to %s\n",
3590  GNUNET_i2s (pid));
3591  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3592  {
3593  if (CT_CORE != tc->type)
3594  continue;
3596  }
3597 }
3598 
3599 
3607 static void
3608 transmit_on_queue (void *cls);
3609 
3610 
3614 static unsigned int
3616 {
3617  for (struct Queue *s = queue_head; NULL != s;
3618  s = s->next_client)
3619  {
3620  if (s->tc->details.communicator.address_prefix !=
3621  queue->tc->details.communicator.address_prefix)
3622  {
3624  "queue address %s qid %u compare with queue: address %s qid %u\n",
3625  queue->address,
3626  queue->qid,
3627  s->address,
3628  s->qid);
3629  if ((s->priority > queue->priority) && (0 < s->q_capacity) &&
3630  (QUEUE_LENGTH_LIMIT > s->queue_length) )
3631  return GNUNET_YES;
3633  "Lower prio\n");
3634  }
3635  }
3636  return GNUNET_NO;
3637 }
3638 
3639 
3647 static void
3649  struct Queue *queue,
3651 {
3653  queue->tc->details.communicator.
3654  queue_head))
3655  return;
3656 
3657  if (queue->tc->details.communicator.total_queue_length >=
3659  {
3661  "Transmission throttled due to communicator queue limit\n");
3663  GST_stats,
3664  "# Transmission throttled due to communicator queue limit",
3665  1,
3666  GNUNET_NO);
3667  queue->idle = GNUNET_NO;
3668  return;
3669  }
3670  if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
3671  {
3673  "Transmission throttled due to communicator queue length limit\n");
3675  "# Transmission throttled due to queue queue limit",
3676  1,
3677  GNUNET_NO);
3678  queue->idle = GNUNET_NO;
3679  return;
3680  }
3681  if (0 == queue->q_capacity)
3682  {
3684  "Transmission throttled due to communicator message queue qid %u has capacity %lu.\n",
3685  queue->qid,
3686  queue->q_capacity);
3688  "# Transmission throttled due to message queue capacity",
3689  1,
3690  GNUNET_NO);
3691  queue->idle = GNUNET_NO;
3692  return;
3693  }
3694  /* queue might indeed be ready, schedule it */
3695  if (NULL != queue->transmit_task)
3696  GNUNET_SCHEDULER_cancel (queue->transmit_task);
3697  queue->transmit_task =
3699  queue);
3701  "Considering transmission on queue `%s' QID %llu to %s\n",
3702  queue->address,
3703  (unsigned long long) queue->qid,
3704  GNUNET_i2s (&queue->neighbour->pid));
3705 }
3706 
3707 
3714 static void
3715 check_link_down (void *cls)
3716 {
3717  struct VirtualLink *vl = cls;
3718  struct DistanceVector *dv = vl->dv;
3719  struct Neighbour *n = vl->n;
3720  struct GNUNET_TIME_Absolute dvh_timeout;
3721  struct GNUNET_TIME_Absolute q_timeout;
3722 
3724  "Checking if link is down\n");
3725  vl->visibility_task = NULL;
3726  dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3727  if (NULL != dv)
3728  {
3729  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
3730  pos = pos->next_dv)
3731  dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout,
3732  pos->path_valid_until);
3733  if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
3734  {
3735  vl->dv->vl = NULL;
3736  vl->dv = NULL;
3737  }
3738  }
3739  q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3740  for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
3741  q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
3742  if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
3743  {
3744  vl->n->vl = NULL;
3745  vl->n = NULL;
3746  }
3747  if ((NULL == vl->n) && (NULL == vl->dv))
3748  {
3750  free_virtual_link (vl);
3751  return;
3752  }
3753  vl->visibility_task =
3754  GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout),
3755  &check_link_down,
3756  vl);
3757 }
3758 
3759 
3765 static void
3767 {
3768  struct Neighbour *neighbour = queue->neighbour;
3769  struct TransportClient *tc = queue->tc;
3770  struct MonitorEvent me = { .cs = GNUNET_TRANSPORT_CS_DOWN,
3772  struct QueueEntry *qe;
3773  int maxxed;
3774  struct PendingAcknowledgement *pa;
3775  struct VirtualLink *vl;
3776 
3778  "Cleaning up queue %u\n", queue->qid);
3779  if (NULL != queue->transmit_task)
3780  {
3781  GNUNET_SCHEDULER_cancel (queue->transmit_task);
3782  queue->transmit_task = NULL;
3783  }
3784  while (NULL != (pa = queue->pa_head))
3785  {
3786  GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
3787  pa->queue = NULL;
3788  }
3789 
3790  GNUNET_CONTAINER_MDLL_remove (neighbour,
3791  neighbour->queue_head,
3792  neighbour->queue_tail,
3793  queue);
3795  tc->details.communicator.queue_head,
3796  tc->details.communicator.queue_tail,
3797  queue);
3798  maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT <=
3799  tc->details.communicator.
3800  total_queue_length);
3802  "Cleaning up queue with length %u\n",
3803  queue->queue_length);
3804  while (NULL != (qe = queue->queue_head))
3805  {
3806  GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe);
3807  queue->queue_length--;
3808  tc->details.communicator.total_queue_length--;
3809  if (NULL != qe->pm)
3810  {
3811  GNUNET_assert (qe == qe->pm->qe);
3812  qe->pm->qe = NULL;
3813  }
3814  GNUNET_free (qe);
3815  }
3817  "Cleaning up queue with length %u\n",
3818  queue->queue_length);
3819  GNUNET_assert (0 == queue->queue_length);
3820  if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT >
3821  tc->details.communicator.total_queue_length))
3822  {
3823  /* Communicator dropped below threshold, resume all _other_ queues */
3825  GST_stats,
3826  "# Transmission throttled due to communicator queue limit",
3827  -1,
3828  GNUNET_NO);
3829  for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
3830  s = s->next_client)
3832  s,
3834  }
3835  notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
3836  GNUNET_free (queue);
3837 
3838  vl = lookup_virtual_link (&neighbour->pid);
3839  if ((NULL != vl) && (neighbour == vl->n))
3840  {
3842  check_link_down (vl);
3843  }
3844  if (NULL == neighbour->queue_head)
3845  {
3846  free_neighbour (neighbour);
3847  }
3848 }
3849 
3850 
3856 static void
3858 {
3859  struct TransportClient *tc = ale->tc;
3860 
3861  GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
3862  tc->details.communicator.addr_tail,
3863  ale);
3864  if (NULL != ale->sc)
3865  {
3867  "store cancel\n");
3869  ale->sc = NULL;
3870  }
3871  if (NULL != ale->st)
3872  {
3873  GNUNET_SCHEDULER_cancel (ale->st);
3874  ale->st = NULL;
3875  }
3876  GNUNET_free (ale);
3877 }
3878 
3879 
3888 static int
3890  const struct GNUNET_PeerIdentity *pid,
3891  void *value)
3892 {
3893  struct TransportClient *tc = cls;
3894  struct PeerRequest *pr = value;
3895 
3897  pr->wc = NULL;
3898  GNUNET_assert (
3899  GNUNET_YES ==
3900  GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
3901  pid,
3902  pr));
3903  GNUNET_free (pr);
3904 
3905  return GNUNET_OK;
3906 }
3907 
3908 
3909 static void
3910 do_shutdown (void *cls);
3911 
3920 static void
3922  struct GNUNET_SERVICE_Client *client,
3923  void *app_ctx)
3924 {
3925  struct TransportClient *tc = app_ctx;
3926 
3927  (void) cls;
3928  (void) client;
3930  switch (tc->type)
3931  {
3932  case CT_NONE:
3934  "Unknown Client %p disconnected, cleaning up.\n",
3935  tc);
3936  break;
3937 
3938  case CT_CORE: {
3940  "CORE Client %p disconnected, cleaning up.\n",
3941  tc);
3942 
3943  struct PendingMessage *pm;
3944 
3945  while (NULL != (pm = tc->details.core.pending_msg_head))
3946  {
3948  tc->details.core.pending_msg_head,
3949  tc->details.core.pending_msg_tail,
3950  pm);
3951  pm->client = NULL;
3952  }
3953  }
3954  break;
3955 
3956  case CT_MONITOR:
3958  "MONITOR Client %p disconnected, cleaning up.\n",
3959  tc);
3960 
3961  break;
3962 
3963  case CT_COMMUNICATOR: {
3965  "COMMUNICATOR Client %p disconnected, cleaning up.\n",
3966  tc);
3967 
3968  struct Queue *q;
3969  struct AddressListEntry *ale;
3970 
3971  while (NULL != (q = tc->details.communicator.queue_head))
3972  free_queue (q);
3973  while (NULL != (ale = tc->details.communicator.addr_head))
3975  GNUNET_free (tc->details.communicator.address_prefix);
3976  }
3977  break;
3978 
3979  case CT_APPLICATION:
3981  "APPLICATION Client %p disconnected, cleaning up.\n",
3982  tc);
3983 
3984  GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
3986  tc);
3987  GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests);
3988  break;
3989  }
3990  GNUNET_free (tc);
3991  if ((GNUNET_YES == in_shutdown) && (NULL == clients_head))
3992  {
3994  "Our last client disconnected\n");
3995  do_shutdown (cls);
3996  }
3997 }
3998 
3999 
4009 static int
4011  const struct GNUNET_PeerIdentity *pid,
4012  void *value)
4013 {
4014  struct TransportClient *tc = cls;
4015 
4016  (void) value;
4018  "Telling new CORE client about existing connection to %s\n",
4019  GNUNET_i2s (pid));
4021  return GNUNET_OK;
4022 }
4023 
4024 
4033 static void
4034 handle_client_start (void *cls, const struct StartMessage *start)
4035 {
4036  struct TransportClient *tc = cls;
4037  uint32_t options;
4038 
4039  options = ntohl (start->options);
4040  if ((0 != (1 & options)) &&
4041  (0 != GNUNET_memcmp (&start->self, &GST_my_identity)))
4042  {
4043  /* client thinks this is a different peer, reject */
4044  GNUNET_break (0);
4045  GNUNET_SERVICE_client_drop (tc->client);
4046  return;
4047  }
4048  if (CT_NONE != tc->type)
4049  {
4050  GNUNET_break (0);
4051  GNUNET_SERVICE_client_drop (tc->client);
4052  return;
4053  }
4054  tc->type = CT_CORE;
4056  "New CORE client with PID %s registered\n",
4057  GNUNET_i2s (&start->self));
4060  tc);
4062 }
4063 
4064 
4071 static int
4072 check_client_send (void *cls, const struct OutboundMessage *obm)
4073 {
4074  struct TransportClient *tc = cls;
4075  uint16_t size;
4076  const struct GNUNET_MessageHeader *obmm;
4077 
4078  if (CT_CORE != tc->type)
4079  {
4080  GNUNET_break (0);
4081  return GNUNET_SYSERR;
4082  }
4083  size = ntohs (obm->header.size) - sizeof(struct OutboundMessage);
4084  if (size < sizeof(struct GNUNET_MessageHeader))
4085  {
4086  GNUNET_break (0);
4087  return GNUNET_SYSERR;
4088  }
4089  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
4090  if (size != ntohs (obmm->size))
4091  {
4092  GNUNET_break (0);
4093  return GNUNET_SYSERR;
4094  }
4095  return GNUNET_OK;
4096 }
4097 
4098 
4106 static void
4108 {
4109  struct TransportClient *tc = pm->client;
4110  struct VirtualLink *vl = pm->vl;
4111 
4113  "client send response\n");
4114  if (NULL != tc)
4115  {
4116  struct GNUNET_MQ_Envelope *env;
4117  struct SendOkMessage *so_msg;
4118 
4120  so_msg->peer = vl->target;
4122  "Confirming transmission of <%llu> to %s\n",
4123  pm->logging_uuid,
4124  GNUNET_i2s (&vl->target));
4125  GNUNET_MQ_send (tc->mq, env);
4126  }
4128 }
4129 
4130 
4140 static unsigned int
4143  struct DistanceVectorHop **hops_array,
4144  unsigned int hops_array_length)
4145 {
4146  uint64_t choices[hops_array_length];
4147  uint64_t num_dv;
4148  unsigned int dv_count;
4149 
4150  /* Pick random vectors, but weighted by distance, giving more weight
4151  to shorter vectors */
4152  num_dv = 0;
4153  dv_count = 0;
4154  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4155  pos = pos->next_dv)
4156  {
4157  if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
4158  (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
4159  .rel_value_us == 0))
4160  continue; /* pos unconfirmed and confirmed required */
4161  num_dv += MAX_DV_HOPS_ALLOWED - pos->distance;
4162  dv_count++;
4163  }
4164  if (0 == dv_count)
4165  return 0;
4166  if (dv_count <= hops_array_length)
4167  {
4168  dv_count = 0;
4169  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4170  pos = pos->next_dv)
4171  hops_array[dv_count++] = pos;
4172  return dv_count;
4173  }
4174  for (unsigned int i = 0; i < hops_array_length; i++)
4175  {
4176  int ok = GNUNET_NO;
4177  while (GNUNET_NO == ok)
4178  {
4179  choices[i] =
4181  ok = GNUNET_YES;
4182  for (unsigned int j = 0; j < i; j++)
4183  if (choices[i] == choices[j])
4184  {
4185  ok = GNUNET_NO;
4186  break;
4187  }
4188  }
4189  }
4190  dv_count = 0;
4191  num_dv = 0;
4192  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4193  pos = pos->next_dv)
4194  {
4195  uint32_t delta = MAX_DV_HOPS_ALLOWED - pos->distance;
4196 
4197  if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
4198  (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
4199  .rel_value_us == 0))
4200  continue; /* pos unconfirmed and confirmed required */
4201  for (unsigned int i = 0; i < hops_array_length; i++)
4202  if ((num_dv <= choices[i]) && (num_dv + delta > choices[i]))
4203  hops_array[dv_count++] = pos;
4204  num_dv += delta;
4205  }
4206  return dv_count;
4207 }
4208 
4209 
4216 static int
4218  void *cls,
4219  const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
4220 {
4221  struct TransportClient *tc = cls;
4222  uint16_t size;
4223 
4224  if (CT_NONE != tc->type)
4225  {
4226  GNUNET_break (0);
4227  return GNUNET_SYSERR;
4228  }
4229  tc->type = CT_COMMUNICATOR;
4230  size = ntohs (cam->header.size) - sizeof(*cam);
4231  if (0 == size)
4232  return GNUNET_OK; /* receive-only communicator */
4234  return GNUNET_OK;
4235 }
4236 
4237 
4243 static void
4245  unsigned
4246  int continue_client)
4247 {
4248  if (0 != ntohl (cmc->im.fc_on))
4249  {
4250  /* send ACK when done to communicator for flow control! */
4251  struct GNUNET_MQ_Envelope *env;
4252  struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
4253 
4255  "Acknowledge message with flow control id %lu\n",
4256  cmc->im.fc_id);
4258  ack->reserved = htonl (0);
4259  ack->fc_id = cmc->im.fc_id;
4260  ack->sender = cmc->im.neighbour_sender;
4261  GNUNET_MQ_send (cmc->tc->mq, env);
4262  }
4263 
4264  if (GNUNET_YES == continue_client)
4265  {
4267  }
4268  GNUNET_free (cmc);
4269 }
4270 
4271 
4272 static void
4274 {
4276 }
4277 
4278 
4288 static void
4289 handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
4290 {
4291  struct TransportClient *tc = cls;
4292  struct VirtualLink *vl;
4293  uint32_t delta;
4294  struct CommunicatorMessageContext *cmc;
4295 
4296  if (CT_CORE != tc->type)
4297  {
4298  GNUNET_break (0);
4299  GNUNET_SERVICE_client_drop (tc->client);
4300  return;
4301  }
4302  vl = lookup_virtual_link (&rom->peer);
4303  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
4304  {
4306  "# RECV_OK dropped: virtual link unknown",
4307  1,
4308  GNUNET_NO);
4310  return;
4311  }
4312  delta = ntohl (rom->increase_window_delta);
4313  vl->core_recv_window += delta;
4315  "CORE ack receiving message, increased CORE recv window to %u\n",
4316  vl->core_recv_window);
4317  if (vl->core_recv_window <= 0)
4318  return;
4319  /* resume communicators */
4320  while (NULL != (cmc = vl->cmc_tail))
4321  {
4323  finish_cmc_handling (cmc);
4324  }
4326 }
4327 
4328 
4335 static void
4337  void *cls,
4338  const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
4339 {
4340  struct TransportClient *tc = cls;
4341  uint16_t size;
4342 
4343  size = ntohs (cam->header.size) - sizeof(*cam);
4344  if (0 == size)
4345  {
4347  "Receive-only communicator connected\n");
4348  return; /* receive-only communicator */
4349  }
4350  tc->details.communicator.address_prefix =
4351  GNUNET_strdup ((const char *) &cam[1]);
4352  tc->details.communicator.cc =
4353  (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
4355  "Communicator with prefix `%s' connected\n",
4356  tc->details.communicator.address_prefix);
4358 }
4359 
4360 
4368 static int
4370  void *cls,
4371  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
4372 {
4373  const struct GNUNET_MessageHeader *inbox;
4374  const char *is;
4375  uint16_t msize;
4376  uint16_t isize;
4377 
4378  (void) cls;
4379  msize = ntohs (cb->header.size) - sizeof(*cb);
4380  inbox = (const struct GNUNET_MessageHeader *) &cb[1];
4381  isize = ntohs (inbox->size);
4382  if (isize >= msize)
4383  {
4384  GNUNET_break (0);
4385  return GNUNET_SYSERR;
4386  }
4387  is = (const char *) inbox;
4388  is += isize;
4389  msize -= isize;
4390  GNUNET_assert (0 < msize);
4391  if ('\0' != is[msize - 1])
4392  {
4393  GNUNET_break (0);
4394  return GNUNET_SYSERR;
4395  }
4396  return GNUNET_OK;
4397 }
4398 
4399 
4406 static void
4408 {
4409  struct EphemeralConfirmationPS ec;
4410 
4411  if (0 !=
4413  return;
4415  dv->ephemeral_validity =
4420  ec.target = dv->target;
4421  ec.ephemeral_key = dv->ephemeral_key;
4423  ec.purpose.size = htonl (sizeof(ec));
4425  &ec,
4426  &dv->sender_sig);
4427 }
4428 
4429 
4439 static void
4441  struct PendingMessage *pm,
4442  const void *payload,
4443  size_t payload_size)
4444 {
4445  struct Neighbour *n = queue->neighbour;
4446  struct GNUNET_TRANSPORT_SendMessageTo *smt;
4447  struct GNUNET_MQ_Envelope *env;
4448 
4449  GNUNET_log (
4451  "Queueing %u bytes of payload for transmission <%llu> on queue %llu to %s\n",
4452  (unsigned int) payload_size,
4453  (NULL == pm) ? 0 : pm->logging_uuid,
4454  (unsigned long long) queue->qid,
4455  GNUNET_i2s (&queue->neighbour->pid));
4456  env = GNUNET_MQ_msg_extra (smt,
4457  payload_size,
4459  smt->qid = queue->qid;
4460  smt->mid = queue->mid_gen;
4461  smt->receiver = n->pid;
4462  memcpy (&smt[1], payload, payload_size);
4463  {
4464  /* Pass the env to the communicator of queue for transmission. */
4465  struct QueueEntry *qe;
4466 
4467  qe = GNUNET_new (struct QueueEntry);
4468  qe->mid = queue->mid_gen++;
4469  qe->queue = queue;
4470  if (NULL != pm)
4471  {
4472  qe->pm = pm;
4473  // TODO Why do we have a retransmission. When we know, make decision if we still want this.
4474  // GNUNET_assert (NULL == pm->qe);
4475  if (NULL != pm->qe)
4476  {
4478  "Retransmitting message <%llu> remove pm from qe with MID: %llu \n",
4479  pm->logging_uuid,
4480  (unsigned long long) pm->qe->mid);
4481  // pm->qe->pm = NULL;
4482  }
4483  pm->qe = qe;
4484  }
4485  GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe);
4486  GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
4487  queue->queue_length++;
4488  queue->tc->details.communicator.total_queue_length++;
4489  //FIXME Probably this if statement here is completely wrong in this method,
4490  // and only fixed a symptom, but not an actual bug.
4491  if (0 == queue->q_capacity)
4492  {
4493  GNUNET_free (env);
4494  return;
4495  }
4496  if (GNUNET_NO == queue->unlimited_length)
4497  queue->q_capacity--;
4499  "Queue %s with qid %u has capacity %lu\n",
4500  queue->address,
4501  queue->qid,
4502  queue->q_capacity);
4504  queue->tc->details.communicator.total_queue_length)
4505  queue->idle = GNUNET_NO;
4506  if (QUEUE_LENGTH_LIMIT == queue->queue_length)
4507  queue->idle = GNUNET_NO;
4508  if (0 == queue->q_capacity)
4509  queue->idle = GNUNET_NO;
4511  "Sending message MID %lu of type %u (%u) and size %lu with MQ %p\n",
4512  smt->mid,
4513  ntohs (((const struct GNUNET_MessageHeader *) payload)->type),
4514  ntohs (smt->header.size),
4515  payload_size,
4516  queue->tc->mq);
4517  GNUNET_MQ_send (queue->tc->mq, env);
4518  }
4519 }
4520 
4521 
4532 static struct GNUNET_TIME_Relative
4533 route_via_neighbour (const struct Neighbour *n,
4534  const struct GNUNET_MessageHeader *hdr,
4536 {
4537  struct GNUNET_TIME_Absolute now;
4538  unsigned int candidates;
4539  unsigned int sel1;
4540  unsigned int sel2;
4541  struct GNUNET_TIME_Relative rtt;
4542 
4543  /* Pick one or two 'random' queues from n (under constraints of options) */
4544  now = GNUNET_TIME_absolute_get ();
4545  /* FIXME-OPTIMIZE: give queues 'weights' and pick proportional to
4546  weight in the future; weight could be assigned by observed
4547  bandwidth (note: not sure if we should do this for this type
4548  of control traffic though). */
4549  candidates = 0;
4550  for (struct Queue *pos = n->queue_head; NULL != pos;
4551  pos = pos->next_neighbour)
4552  {
4553  if ((0 != (options & RMO_UNCONFIRMED_ALLOWED)) ||
4554  (pos->validated_until.abs_value_us > now.abs_value_us))
4555  candidates++;
4556  }
4557  if (0 == candidates)
4558  {
4559  /* This can happen rarely if the last confirmed queue timed
4560  out just as we were beginning to process this message. */
4562  "Could not route message of type %u to %s: no valid queue\n",
4563  ntohs (hdr->type),
4564  GNUNET_i2s (&n->pid));
4566  "# route selection failed (all no valid queue)",
4567  1,
4568  GNUNET_NO);
4570  }
4571 
4574  if (0 == (options & RMO_REDUNDANT))
4575  sel2 = candidates; /* picks none! */
4576  else
4578  candidates = 0;
4579  for (struct Queue *pos = n->queue_head; NULL != pos;
4580  pos = pos->next_neighbour)
4581  {
4582  if ((0 != (options & RMO_UNCONFIRMED_ALLOWED)) ||
4583  (pos->validated_until.abs_value_us > now.abs_value_us))
4584  {
4585  if ((sel1 == candidates) || (sel2 == candidates))
4586  {
4588  "Routing message of type %u to %s using %s (#%u)\n",
4589  ntohs (hdr->type),
4590  GNUNET_i2s (&n->pid),
4591  pos->address,
4592  (sel1 == candidates) ? 1 : 2);
4593  rtt = GNUNET_TIME_relative_min (rtt, pos->pd.aged_rtt);
4594  queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
4595  }
4596  candidates++;
4597  }
4598  }
4599  return rtt;
4600 }
4601 
4602 
4607 {
4611  gcry_cipher_hd_t cipher;
4612 
4616  struct
4617  {
4622 
4626  char aes_key[256 / 8];
4627 
4631  char aes_ctr[128 / 8];
4633 };
4634 
4635 
4644 static void
4646  const struct GNUNET_ShortHashCode *iv,
4647  struct DVKeyState *key)
4648 {
4649  /* must match #dh_key_derive_eph_pub */
4651  GNUNET_CRYPTO_kdf (&key->material,
4652  sizeof(key->material),
4653  "transport-backchannel-key",
4654  strlen ("transport-backchannel-key"),
4655  km,
4656  sizeof(*km),
4657  iv,
4658  sizeof(*iv),
4659  NULL));
4661  "Deriving backchannel key based on KM %s and IV %s\n",
4662  GNUNET_h2s (km),
4663  GNUNET_sh2s (iv));
4664  GNUNET_assert (0 == gcry_cipher_open (&key->cipher,
4665  GCRY_CIPHER_AES256 /* low level: go for speed */,
4666  GCRY_CIPHER_MODE_CTR,
4667  0 /* flags */));
4668  GNUNET_assert (0 == gcry_cipher_setkey (key->cipher,
4669  &key->material.aes_key,
4670  sizeof(key->material.aes_key)));
4671  gcry_cipher_setctr (key->cipher,
4672  &key->material.aes_ctr,
4673  sizeof(key->material.aes_ctr));
4674 }
4675 
4676 
4687 static enum GNUNET_GenericReturnValue
4689  const struct GNUNET_CRYPTO_EcdhePrivateKey *priv_ephemeral,
4690  const struct GNUNET_PeerIdentity *target,
4691  const struct GNUNET_ShortHashCode *iv,
4692  struct DVKeyState *key)
4693 {
4694  struct GNUNET_HashCode km;
4695 
4696  if (GNUNET_YES != GNUNET_CRYPTO_ecdh_eddsa (priv_ephemeral,
4697  &target->public_key,
4698  &km))
4699  return GNUNET_SYSERR;
4700  // FIXME: Possibly also add return values here. We are processing
4701  // Input from other peers...
4702  dv_setup_key_state_from_km (&km, iv, key);
4703  return GNUNET_OK;
4704 }
4705 
4706 
4717 static enum GNUNET_GenericReturnValue
4718 dh_key_derive_eph_pub (const struct GNUNET_CRYPTO_EcdhePublicKey *pub_ephemeral,
4719  const struct GNUNET_ShortHashCode *iv,
4720  struct DVKeyState *key)
4721 {
4722  struct GNUNET_HashCode km;
4723 
4725  pub_ephemeral,
4726  &km))
4727  return GNUNET_SYSERR;
4728  dv_setup_key_state_from_km (&km, iv, key);
4729  return GNUNET_OK;
4730 }
4731 
4732 
4742 static void
4743 dv_hmac (const struct DVKeyState *key,
4744  struct GNUNET_HashCode *hmac,
4745  const void *data,
4746  size_t data_size)
4747 {
4748  GNUNET_CRYPTO_hmac (&key->material.hmac_key, data, data_size, hmac);
4749 }
4750 
4751 
4761 static void
4762 dv_encrypt (struct DVKeyState *key, const void *in, void *dst, size_t in_size)
4763 {
4764  GNUNET_assert (0 ==
4765  gcry_cipher_encrypt (key->cipher, dst, in_size, in, in_size));
4766 }
4767 
4768 
4779 static enum GNUNET_GenericReturnValue
4780 dv_decrypt (struct DVKeyState *key,
4781  void *out,
4782  const void *ciph,
4783  size_t out_size)
4784 {
4785  return (0 ==
4786  gcry_cipher_decrypt (key->cipher,
4787  out, out_size,
4788  ciph, out_size)) ? GNUNET_OK : GNUNET_SYSERR;
4789 }
4790 
4791 
4797 static void
4799 {
4800  gcry_cipher_close (key->cipher);
4801  GNUNET_CRYPTO_zero_keys (&key->material, sizeof(key->material));
4802 }
4803 
4804 
4815 typedef void (*DVMessageHandler) (void *cls,
4816  struct Neighbour *next_hop,
4817  const struct GNUNET_MessageHeader *hdr,
4819 
4834 static struct GNUNET_TIME_Relative
4836  unsigned int num_dvhs,
4837  struct DistanceVectorHop **dvhs,
4838  const struct GNUNET_MessageHeader *hdr,
4839  DVMessageHandler use,
4840  void *use_cls,
4842  enum GNUNET_GenericReturnValue without_fc)
4843 {
4844  struct TransportDVBoxMessage box_hdr;
4845  struct TransportDVBoxPayloadP payload_hdr;
4846  uint16_t enc_body_size = ntohs (hdr->size);
4847  char enc[sizeof(struct TransportDVBoxPayloadP) + enc_body_size] GNUNET_ALIGN;
4848  struct DVKeyState *key;
4849  struct GNUNET_TIME_Relative rtt;
4850 
4851  key = GNUNET_new (struct DVKeyState);
4852  /* Encrypt payload */
4854  box_hdr.total_hops = htons (0);
4855  box_hdr.without_fc = htons (without_fc);
4856  update_ephemeral (dv);
4857  box_hdr.ephemeral_key = dv->ephemeral_key;
4858  payload_hdr.sender_sig = dv->sender_sig;
4859 
4861  &box_hdr.iv,
4862  sizeof(box_hdr.iv));
4863  // We are creating this key, so this must work.
4865  dh_key_derive_eph_pid (&dv->private_key,
4866  &dv->target,
4867  &box_hdr.iv, key));
4868  payload_hdr.sender = GST_my_identity;
4869  payload_hdr.monotonic_time = GNUNET_TIME_absolute_hton (dv->monotime);
4870  dv_encrypt (key, &payload_hdr, enc, sizeof(payload_hdr));
4871  dv_encrypt (key,
4872  hdr,
4873  &enc[sizeof(struct TransportDVBoxPayloadP)],
4874  enc_body_size);
4875  dv_hmac (key, &box_hdr.hmac, enc, sizeof(enc));
4876  dv_key_clean (key);
4878  /* For each selected path, take the pre-computed header and body
4879  and add the path in the middle of the message; then send it. */
4880  for (unsigned int i = 0; i < num_dvhs; i++)
4881  {
4882  struct DistanceVectorHop *dvh = dvhs[i];
4883  unsigned int num_hops = dvh->distance + 1;
4884  char buf[sizeof(struct TransportDVBoxMessage)
4885  + sizeof(struct GNUNET_PeerIdentity) * num_hops
4886  + sizeof(struct TransportDVBoxPayloadP)
4887  + enc_body_size] GNUNET_ALIGN;
4888  struct GNUNET_PeerIdentity *dhops;
4889 
4890  box_hdr.header.size = htons (sizeof(buf));
4891  box_hdr.orig_size = htons (sizeof(buf));
4892  box_hdr.num_hops = htons (num_hops);
4893  memcpy (buf, &box_hdr, sizeof(box_hdr));
4894  dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof(box_hdr)];
4895  memcpy (dhops,
4896  dvh->path,
4897  dvh->distance * sizeof(struct GNUNET_PeerIdentity));
4898  dhops[dvh->distance] = dv->target;
4899  if (GNUNET_EXTRA_LOGGING > 0)
4900  {
4901  char *path;
4902 
4904  for (unsigned int j = 0; j < num_hops; j++)
4905  {
4906  char *tmp;
4907 
4908  GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[j]));
4909  GNUNET_free (path);
4910  path = tmp;
4911  }
4913  "Routing message of type %u to %s using DV (#%u/%u) via %s\n",
4914  ntohs (hdr->type),
4915  GNUNET_i2s (&dv->target),
4916  i + 1,
4917  num_dvhs,
4918  path);
4919  GNUNET_free (path);
4920  }
4921  rtt = GNUNET_TIME_relative_min (rtt, dvh->pd.aged_rtt);
4922  memcpy (&dhops[num_hops], enc, sizeof(enc));
4923  use (use_cls,
4924  dvh->next_hop,
4925  (const struct GNUNET_MessageHeader *) buf,
4926  options);
4927  GNUNET_free (key);
4928  }
4929  return rtt;
4930 }
4931 
4932 
4942 static void
4944  struct Neighbour *next_hop,
4945  const struct GNUNET_MessageHeader *hdr,
4947 {
4948  (void) cls;
4949  (void) route_via_neighbour (next_hop, hdr, RMO_UNCONFIRMED_ALLOWED);
4950 }
4951 
4952 
4964 static struct GNUNET_TIME_Relative
4966 // route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
4967  const struct GNUNET_MessageHeader *hdr,
4969 {
4970  // struct VirtualLink *vl;
4971  struct Neighbour *n;
4972  struct DistanceVector *dv;
4973  struct GNUNET_TIME_Relative rtt1;
4974  struct GNUNET_TIME_Relative rtt2;
4975  const struct GNUNET_PeerIdentity *target = &vl->target;
4976 
4978  "Trying to route message of type %u to %s without fc\n",
4979  ntohs (hdr->type),
4980  GNUNET_i2s (target));
4981 
4982  // TODO Do this elsewhere. vl should be given as parameter to method.
4983  // vl = lookup_virtual_link (target);
4984  GNUNET_assert (NULL != vl && GNUNET_YES == vl->confirmed);
4985  if (NULL == vl)
4987  n = vl->n;
4988  dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
4989  if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
4990  {
4991  /* if confirmed is required, and we do not have anything
4992  confirmed, drop respective options */
4993  if (NULL == n)
4994  n = lookup_neighbour (target);
4995  if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
4997  }
4998  if ((NULL == n) && (NULL == dv))
4999  {
5001  "Cannot route message of type %u to %s: no route\n",
5002  ntohs (hdr->type),
5003  GNUNET_i2s (target));
5005  "# Messages dropped in routing: no acceptable method",
5006  1,
5007  GNUNET_NO);
5009  }
5011  "Routing message of type %u to %s with options %X\n",
5012  ntohs (hdr->type),
5013  GNUNET_i2s (target),
5014  (unsigned int) options);
5015  /* If both dv and n are possible and we must choose:
5016  flip a coin for the choice between the two; for now 50/50 */
5017  if ((NULL != n) && (NULL != dv) && (0 == (options & RMO_REDUNDANT)))
5018  {
5020  n = NULL;
5021  else
5022  dv = NULL;
5023  }
5024  if ((NULL != n) && (NULL != dv))
5025  options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's
5026  enough for redundancy, so clear the flag. */
5029  if (NULL != n)
5030  {
5032  "Try to route message of type %u to %s without fc via neighbour\n",
5033  ntohs (hdr->type),
5034  GNUNET_i2s (target));
5035  rtt1 = route_via_neighbour (n, hdr, options);
5036  }
5037  if (NULL != dv)
5038  {
5039  struct DistanceVectorHop *hops[2];
5040  unsigned int res;
5041 
5043  options,
5044  hops,
5045  (0 == (options & RMO_REDUNDANT)) ? 1 : 2);
5046  if (0 == res)
5047  {
5049  "Failed to route message, could not determine DV path\n");
5050  return rtt1;
5051  }
5053  "encapsulate_for_dv 1\n");
5054  rtt2 = encapsulate_for_dv (dv,
5055  res,
5056  hops,
5057  hdr,
5059  NULL,
5060  options & (~RMO_REDUNDANT),
5061  GNUNET_YES);
5062  }
5063  return GNUNET_TIME_relative_min (rtt1, rtt2);
5064 }
5065 
5066 
5067 static void
5068 consider_sending_fc (void *cls);
5069 
5076 static void
5078 {
5079  struct VirtualLink *vl = cls;
5080  vl->fc_retransmit_task = NULL;
5081  consider_sending_fc (cls);
5082 }
5083 
5084 
5091 static void
5093 {
5094  struct VirtualLink *vl = cls;
5095  struct GNUNET_TIME_Absolute monotime;
5096  struct TransportFlowControlMessage fc;
5098  struct GNUNET_TIME_Relative rtt;
5099 
5101  /* OPTIMIZE-FC-BDP: decide sane criteria on when to do this, instead of doing
5102  it always! */
5103  /* For example, we should probably ONLY do this if a bit more than
5104  an RTT has passed, or if the window changed "significantly" since
5105  then. See vl->last_fc_rtt! NOTE: to do this properly, we also
5106  need an estimate for the bandwidth-delay-product for the entire
5107  VL, as that determines "significantly". We have the delay, but
5108  the bandwidth statistics need to be added for the VL!*/(void) duration;
5109 
5111  "Sending FC seq %u to %s with new window %llu\n",
5112  (unsigned int) vl->fc_seq_gen,
5113  GNUNET_i2s (&vl->target),
5114  (unsigned long long) vl->incoming_fc_window_size);
5116  vl->last_fc_transmission = monotime;
5118  fc.header.size = htons (sizeof(fc));
5119  fc.seq = htonl (vl->fc_seq_gen++);
5123  fc.sender_time = GNUNET_TIME_absolute_hton (monotime);
5125  if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us)
5126  {
5129  "FC retransmission to %s failed, will retry in %s\n",
5130  GNUNET_i2s (&vl->target),
5133  }
5134  else
5135  {
5136  /* OPTIMIZE-FC-BDP: rtt is not ideal, we can do better! */
5137  vl->last_fc_rtt = rtt;
5138  }
5139  if (NULL != vl->fc_retransmit_task)
5142  {
5144  vl->fc_retransmit_count = 0;
5145  }
5146  vl->fc_retransmit_task =
5148  vl->fc_retransmit_count++;
5149 }
5150 
5151 
5168 static void
5170 {
5171  struct Neighbour *n = vl->n;
5172  struct DistanceVector *dv = vl->dv;
5173  struct GNUNET_TIME_Absolute now;
5174  struct VirtualLink *vl_next_hop;
5175  int elig;
5176 
5178  "check_vl_transmission to target %s\n",
5179  GNUNET_i2s (&vl->target));
5180  /* Check that we have an eligible pending message!
5181  (cheaper than having #transmit_on_queue() find out!) */
5182  elig = GNUNET_NO;
5183  for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
5184  pm = pm->next_vl)
5185  {
5187  "check_vl_transmission loop\n");
5188  if (NULL != pm->qe)
5189  continue; /* not eligible, is in a queue! */
5190  if (pm->bytes_msg + vl->outbound_fc_window_size_used >
5192  {
5194  "Stalled message %llu transmission on VL %s due to flow control: %llu < %llu\n",
5195  pm->logging_uuid,
5196  GNUNET_i2s (&vl->target),
5197  (unsigned long long) vl->outbound_fc_window_size,
5198  (unsigned long long) (pm->bytes_msg
5200  consider_sending_fc (vl);
5201  return; /* We have a message, but flow control says "nope" */
5202  }
5204  "Target window on VL %s not stalled. Scheduling transmission on queue\n",
5205  GNUNET_i2s (&vl->target));
5206  /* Notify queues at direct neighbours that we are interested */
5207  now = GNUNET_TIME_absolute_get ();
5208  if (NULL != n)
5209  {
5210  for (struct Queue *queue = n->queue_head; NULL != queue;
5211  queue = queue->next_neighbour)
5212  {
5213  if ((GNUNET_YES == queue->idle) &&
5214  (queue->validated_until.abs_value_us > now.abs_value_us))
5215  {
5217  "Direct neighbour %s not stalled\n",
5218  GNUNET_i2s (&n->pid));
5220  queue,
5222  elig = GNUNET_YES;
5223  }
5224  else
5226  "Neighbour Queue QID: %u (%u) busy or invalid\n",
5227  queue->qid,
5228  queue->idle);
5229  }
5230  }
5231  /* Notify queues via DV that we are interested */
5232  if (NULL != dv)
5233  {
5234  /* Do DV with lower scheduler priority, which effectively means that
5235  IF a neighbour exists and is available, we prefer it. */
5236  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
5237  pos = pos->next_dv)
5238  {
5239  struct Neighbour *nh = pos->next_hop;
5240 
5241 
5242  if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
5243  continue; /* skip this one: path not validated */
5244  else
5245  {
5246  vl_next_hop = lookup_virtual_link (&nh->pid);
5247  GNUNET_assert (NULL != vl_next_hop);
5248  if (pm->bytes_msg + vl_next_hop->outbound_fc_window_size_used >
5249  vl_next_hop->outbound_fc_window_size)
5250  {
5252  "Stalled message %llu transmission on next hop %s due to flow control: %llu < %llu\n",
5253  pm->logging_uuid,
5254  GNUNET_i2s (&vl_next_hop->target),
5255  (unsigned long
5256  long) vl_next_hop->outbound_fc_window_size,
5257  (unsigned long long) (pm->bytes_msg
5258  + vl_next_hop->
5259  outbound_fc_window_size_used));
5260  consider_sending_fc (vl_next_hop);
5261  continue; /* We have a message, but flow control says "nope" for the first hop of this path */
5262  }
5263  for (struct Queue *queue = nh->queue_head; NULL != queue;
5264  queue = queue->next_neighbour)
5265  if ((GNUNET_YES == queue->idle) &&
5266  (queue->validated_until.abs_value_us > now.abs_value_us))
5267  {
5269  "Next hop neighbour %s not stalled\n",
5270  GNUNET_i2s (&nh->pid));
5272  queue,
5274  elig = GNUNET_YES;
5275  }
5276  else
5278  "DV Queue QID: %u (%u) busy or invalid\n",
5279  queue->qid,
5280  queue->idle);
5281  }
5282  }
5283  }
5284  if (GNUNET_YES == elig)
5286  "Eligible message %llu of size %u to %s: %llu/%llu\n",
5287  pm->logging_uuid,
5288  pm->bytes_msg,
5289  GNUNET_i2s (&vl->target),
5290  (unsigned long long) vl->outbound_fc_window_size,
5291  (unsigned long long) (pm->bytes_msg
5293  break;
5294  }
5295 }
5296 
5297 
5304 static void
5305 handle_client_send (void *cls, const struct OutboundMessage *obm)
5306 {
5307  struct TransportClient *tc = cls;
5308  struct PendingMessage *pm;
5309  const struct GNUNET_MessageHeader *obmm;
5310  uint32_t bytes_msg;
5311  struct VirtualLink *vl;
5313 
5314  GNUNET_assert (CT_CORE == tc->type);
5315  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
5316  bytes_msg = ntohs (obmm->size);
5317  pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
5318  vl = lookup_virtual_link (&obm->peer);
5319  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5320  {
5322  "Don't have %s as a neighbour (anymore).\n",
5323  GNUNET_i2s (&obm->peer));
5324  /* Failure: don't have this peer as a neighbour (anymore).
5325  Might have gone down asynchronously, so this is NOT
5326  a protocol violation by CORE. Still count the event,
5327  as this should be rare. */
5330  "# messages dropped (neighbour unknown)",
5331  1,
5332  GNUNET_NO);
5333  return;
5334  }
5335 
5336  pm = GNUNET_malloc (sizeof(struct PendingMessage) + bytes_msg);
5338  "1 created pm %p storing vl %p\n",
5339  pm,
5340  vl);
5341  pm->logging_uuid = logging_uuid_gen++;
5342  pm->prefs = pp;
5343  pm->client = tc;
5344  pm->vl = vl;
5345  pm->bytes_msg = bytes_msg;
5346  memcpy (&pm[1], obmm, bytes_msg);
5348  "Sending %u bytes as <%llu> to %s\n",
5349  bytes_msg,
5350  pm->logging_uuid,
5351  GNUNET_i2s (&obm->peer));
5353  tc->details.core.pending_msg_head,
5354  tc->details.core.pending_msg_tail,
5355  pm);
5357  vl->pending_msg_head,
5358  vl->pending_msg_tail,
5359  pm);
5360  check_vl_transmission (vl);
5362 }
5363 
5364 
5374 static void
5376  void *cls,
5377  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
5378 {
5379  struct Neighbour *n;
5380  struct VirtualLink *vl;
5381  struct TransportClient *tc = cls;
5382  const struct GNUNET_MessageHeader *inbox =
5383  (const struct GNUNET_MessageHeader *) &cb[1];
5384  uint16_t isize = ntohs (inbox->size);
5385  const char *is = ((const char *) &cb[1]) + isize;
5386  size_t slen = strlen (is) + 1;
5387  char
5388  mbuf[slen + isize
5389  + sizeof(struct
5393 
5394  /* 0-termination of 'is' was checked already in
5395  #check_communicator_backchannel() */
5397  "Preparing backchannel transmission to %s:%s of type %u and size %u\n",
5398  GNUNET_i2s (&cb->pid),
5399  is,
5400  ntohs (inbox->type),
5401  ntohs (inbox->size));
5402  /* encapsulate and encrypt message */
5403  be->header.type =
5405  be->header.size = htons (sizeof(mbuf));
5406  memcpy (&be[1], inbox, isize);
5407  memcpy (&mbuf[sizeof(struct TransportBackchannelEncapsulationMessage)
5408  + isize],
5409  is,
5410  strlen (is) + 1);
5411  // route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED);
5412  vl = lookup_virtual_link (&cb->pid);
5413  if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
5414  {
5416  }
5417  else
5418  {
5419  /* Use route via neighbour */
5420  n = lookup_neighbour (&cb->pid);
5421  if (NULL != n)
5423  n,
5424  &be->header,
5425  RMO_NONE);
5426  }
5428 }
5429 
5430 
5438 static int
5440  const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
5441 {
5442  struct TransportClient *tc = cls;
5443 
5444  if (CT_COMMUNICATOR != tc->type)
5445  {
5446  GNUNET_break (0);
5447  return GNUNET_SYSERR;
5448  }
5450  return GNUNET_OK;
5451 }
5452 
5453 
5459 static void
5460 store_pi (void *cls);
5461 
5462 
5469 static void
5470 peerstore_store_own_cb (void *cls, int success)
5471 {
5472  struct AddressListEntry *ale = cls;
5473 
5474  ale->sc = NULL;
5475  if (GNUNET_YES != success)
5477  "Failed to store our own address `%s' in peerstore!\n",
5478  ale->address);
5479  else
5481  "Successfully stored our own address `%s' in peerstore!\n",
5482  ale->address);
5483  /* refresh period is 1/4 of expiration time, that should be plenty
5484  without being excessive. */
5485  ale->st =
5487  4ULL),
5488  &store_pi,
5489  ale);
5490 }
5491 
5492 
5498 static void
5499 store_pi (void *cls)
5500 {
5501  struct AddressListEntry *ale = cls;
5502  void *addr;
5503  size_t addr_len;
5505 
5506  ale->st = NULL;
5509  "Storing our address `%s' in peerstore until %s!\n",
5510  ale->address,
5513  ale->nt,
5516  &addr,
5517  &addr_len);
5519  "transport",
5520  &GST_my_identity,
5522  addr,
5523  addr_len,
5524  expiration,
5527  ale);
5528  GNUNET_free (addr);
5529  if (NULL == ale->sc)
5530  {
5532  "Failed to store our address `%s' with peerstore\n",
5533  ale->address);
5534  ale->st =
5536  }
5537 }
5538 
5539 
5546 static void
5548  const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
5549 {
5550  struct TransportClient *tc = cls;
5551  struct AddressListEntry *ale;
5552  size_t slen;
5553 
5554  /* 0-termination of &aam[1] was checked in #check_add_address */
5556  "Communicator added address `%s'!\n",
5557  (const char *) &aam[1]);
5558  slen = ntohs (aam->header.size) - sizeof(*aam);
5559  ale = GNUNET_malloc (sizeof(struct AddressListEntry) + slen);
5560  ale->tc = tc;
5561  ale->address = (const char *) &ale[1];
5562  ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
5563  ale->aid = aam->aid;
5564  ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
5565  memcpy (&ale[1], &aam[1], slen);
5566  GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
5567  tc->details.communicator.addr_tail,
5568  ale);
5569  ale->st = GNUNET_SCHEDULER_add_now (&store_pi, ale);
5571 }
5572 
5573 
5580 static void
5582  const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
5583 {
5584  struct TransportClient *tc = cls;
5585  struct AddressListEntry *alen;
5586 
5587  if (CT_COMMUNICATOR != tc->type)
5588  {
5589  GNUNET_break (0);
5590  GNUNET_SERVICE_client_drop (tc->client);
5591  return;
5592  }
5593  for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
5594  NULL != ale;
5595  ale = alen)
5596  {
5597  alen = ale->next;
5598  if (dam->aid != ale->aid)
5599  continue;
5600  GNUNET_assert (ale->tc == tc);
5602  "Communicator deleted address `%s'!\n",
5603  ale->address);
5606  return;
5607  }
5609  "Communicator removed address we did not even have.\n");
5611  // GNUNET_SERVICE_client_drop (tc->client);
5612 }
5613 
5614 
5622 static void
5624 
5625 
5633 static void
5634 core_env_sent_cb (void *cls)
5635 {
5636  struct CoreSentContext *ctx = cls;
5637  struct VirtualLink *vl = ctx->vl;
5638 
5639  if (NULL == vl)
5640  {
5641  /* lost the link in the meantime, ignore */
5642  GNUNET_free (ctx);
5643  return;
5644  }
5647  vl->incoming_fc_window_size_ram -= ctx->size;
5648  vl->incoming_fc_window_size_used += ctx->isize;
5649  consider_sending_fc (vl);
5650  GNUNET_free (ctx);
5651 }
5652 
5653 
5654 static void
5656  const struct GNUNET_MessageHeader *mh,
5657  struct CommunicatorMessageContext *cmc,
5658  unsigned int continue_client)
5659 {
5660  uint16_t size = ntohs (mh->size);
5661  int have_core;
5662 
5663  if (vl->incoming_fc_window_size_ram > UINT_MAX - size)
5664  {
5666  "# CORE messages dropped (FC arithmetic overflow)",
5667  1,
5668  GNUNET_NO);
5670  "CORE messages of type %u with %u bytes dropped (FC arithmetic overflow)\n",
5671  (unsigned int) ntohs (mh->type),
5672  (unsigned int) ntohs (mh->size));
5673  finish_cmc_handling_with_continue (cmc, continue_client);
5674  return;
5675  }
5677  {
5679  "# CORE messages dropped (FC window overflow)",
5680  1,
5681  GNUNET_NO);
5683  "CORE messages of type %u with %u bytes dropped (FC window overflow)\n",
5684  (unsigned int) ntohs (mh->type),
5685  (unsigned int) ntohs (mh->size));
5686  finish_cmc_handling_with_continue (cmc, continue_client);
5687  return;
5688  }
5689 
5690  /* Forward to all CORE clients */
5691  have_core = GNUNET_NO;
5692  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
5693  {
5694  struct GNUNET_MQ_Envelope *env;
5695  struct InboundMessage *im;
5696  struct CoreSentContext *ctx;
5697 
5698  if (CT_CORE != tc->type)
5699  continue;
5702  ctx = GNUNET_new (struct CoreSentContext);
5703  ctx->vl = vl;
5704  ctx->size = size;
5705  ctx->isize = (GNUNET_NO == have_core) ? size : 0;
5706  have_core = GNUNET_YES;
5709  im->peer = cmc->im.sender;
5710  memcpy (&im[1], mh, size);
5711  GNUNET_MQ_send (tc->mq, env);
5712  vl->core_recv_window--;
5713  }
5714  if (GNUNET_NO == have_core)
5715  {
5717  "Dropped message to CORE: no CORE client connected!\n");
5718  /* Nevertheless, count window as used, as it is from the
5719  perspective of the other peer! */
5721  /* TODO-M1 */
5723  "Dropped message of type %u with %u bytes to CORE: no CORE client connected!\n",
5724  (unsigned int) ntohs (mh->type),
5725  (unsigned int) ntohs (mh->size));
5726  finish_cmc_handling_with_continue (cmc, continue_client);
5727  return;
5728  }
5730  "Delivered message from %s of type %u to CORE recv window %u\n",
5731  GNUNET_i2s (&cmc->im.sender),
5732  ntohs (mh->type),
5733  vl->core_recv_window);
5734  if (vl->core_recv_window > 0)
5735  {
5736  finish_cmc_handling_with_continue (cmc, continue_client);
5737  return;
5738  }
5739  /* Wait with calling #finish_cmc_handling(cmc) until the message
5740  was processed by CORE MQs (for CORE flow control)! */
5742 }
5743 
5744 
5745 
5754 static void
5755 handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
5756 {
5757  struct CommunicatorMessageContext *cmc = cls;
5758  // struct CommunicatorMessageContext *cmc_copy =
5759  // GNUNET_new (struct CommunicatorMessageContext);
5760  struct GNUNET_MessageHeader *mh_copy;
5761  struct RingBufferEntry *rbe;
5762  struct VirtualLink *vl;
5763  uint16_t size = ntohs (mh->size);
5764 
5766  "Handling raw message of type %u with %u bytes\n",
5767  (unsigned int) ntohs (mh->type),
5768  (unsigned int) ntohs (mh->size));
5769 
5770  if ((size > UINT16_MAX - sizeof(struct InboundMessage)) ||
5771  (size < sizeof(struct GNUNET_MessageHeader)))
5772  {
5773  struct GNUNET_SERVICE_Client *client = cmc->tc->client;
5774 
5775  GNUNET_break (0);
5776  finish_cmc_handling (cmc);
5777  GNUNET_SERVICE_client_drop (client);
5778  return;
5779  }
5780  vl = lookup_virtual_link (&cmc->im.sender);
5781  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5782  {
5783  /* FIXME: sender is giving us messages for CORE but we don't have
5784  the link up yet! I *suspect* this can happen right now (i.e.
5785  sender has verified us, but we didn't verify sender), but if
5786  we pass this on, CORE would be confused (link down, messages
5787  arrive). We should investigate more if this happens often,
5788  or in a persistent manner, and possibly do "something" about
5789  it. Thus logging as error for now. */
5790 
5791  mh_copy = GNUNET_malloc (size);
5792  rbe = GNUNET_new (struct RingBufferEntry);
5793  rbe->cmc = cmc;
5794  /*cmc_copy->tc = cmc->tc;
5795  cmc_copy->im = cmc->im;*/
5796  GNUNET_memcpy (mh_copy, mh, size);
5797 
5798  rbe->mh = mh_copy;
5799 
5800  ring_buffer[ring_buffer_head] = rbe;// cmc_copy;
5801  // cmc_copy->mh = (const struct GNUNET_MessageHeader *) mh_copy;
5802  cmc->mh = (const struct GNUNET_MessageHeader *) mh_copy;
5804  "Storing message for %s and type %u (%u) in ring buffer\n",
5805  GNUNET_i2s (&cmc->im.sender),
5806  (unsigned int) ntohs (mh->type),
5807  (unsigned int) ntohs (mh_copy->type));
5809  {
5810  ring_buffer_head = 0;
5812  }
5813  else
5814  ring_buffer_head++;
5815 
5817  "%u items stored in ring buffer\n",
5819 
5820  /*GNUNET_break_op (0);
5821  GNUNET_STATISTICS_update (GST_stats,
5822  "# CORE messages dropped (virtual link still down)",
5823  1,
5824  GNUNET_NO);
5825 
5826  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5827  "CORE messages of type %u with %u bytes dropped (virtual link still down)\n",
5828  (unsigned int) ntohs (mh->type),
5829  (unsigned int) ntohs (mh->size));
5830  finish_cmc_handling (cmc);*/
5832  // GNUNET_free (cmc);
5833  return;
5834  }
5836 }
5837 
5838 
5846 static int
5848 {
5849  uint16_t size = ntohs (fb->header.size);
5850  uint16_t bsize = size - sizeof(*fb);
5851 
5852  (void) cls;
5853  if (0 == bsize)
5854  {
5855  GNUNET_break_op (0);
5856  return GNUNET_SYSERR;
5857  }
5858  if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
5859  {
5860  GNUNET_break_op (0);
5861  return GNUNET_SYSERR;
5862  }
5863  if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
5864  {
5865  GNUNET_break_op (0);
5866  return GNUNET_SYSERR;
5867  }
5868  return GNUNET_YES;
5869 }
5870 
5871 
5877 static void
5879 {
5880  struct AcknowledgementCummulator *ac = cls;
5881 
5882  ac->task = NULL;
5883  GNUNET_assert (0 == ac->num_acks);
5884  GNUNET_assert (
5885  GNUNET_YES ==
5887  GNUNET_free (ac);
5888 }
5889 
5890 
5896 static void
5898 {
5899  struct Neighbour *n;
5900  struct VirtualLink *vl;
5901  struct AcknowledgementCummulator *ac = cls;
5902  char buf[sizeof(struct TransportReliabilityAckMessage)
5903  + ac->num_acks
5905  struct TransportReliabilityAckMessage *ack =
5908 
5909  ac->task = NULL;
5911  "Sending ACK with %u components to %s\n",
5912  ac->num_acks,
5913  GNUNET_i2s (&ac->target));
5914  GNUNET_assert (0 < ac->num_acks);
5916  ack->header.size =
5917  htons (sizeof(*ack)
5918  + ac->num_acks * sizeof(struct TransportCummulativeAckPayloadP));
5919  ack->ack_counter = htonl (ac->ack_counter += ac->num_acks);
5920  ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
5921  for (unsigned int i = 0; i < ac->num_acks; i++)
5922  {
5923  ap[i].ack_uuid = ac->ack_uuids[i].ack_uuid;
5925  GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time));
5926  }
5927  /*route_control_message_without_fc (
5928  &ac->target,
5929  &ack->header,
5930  RMO_DV_ALLOWED);*/
5931  vl = lookup_virtual_link (&ac->target);
5932  if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
5933  {
5935  vl,
5936  &ack->header,
5937  RMO_DV_ALLOWED);
5938  }
5939  else
5940  {
5941  /* Use route via neighbour */
5942  n = lookup_neighbour (&ac->target);
5943  if (NULL != n)
5945  n,
5946  &ack->header,
5947  RMO_NONE);
5948  }
5949  ac->num_acks = 0;
5952  ac);
5953 }
5954 
5955 
5964 static void
5966  const struct AcknowledgementUUIDP *ack_uuid,
5967  struct GNUNET_TIME_Absolute max_delay)
5968 {
5969  struct AcknowledgementCummulator *ac;
5970 
5972  "Scheduling ACK %s for transmission to %s\n",
5973  GNUNET_uuid2s (&ack_uuid->value),
5974  GNUNET_i2s (pid));
5976  if (NULL == ac)
5977  {
5979  ac->target = *pid;
5980  ac->min_transmission_time = max_delay;
5984  &ac->target,
5985  ac,
5987  }
5988  else
5989  {
5990  if (MAX_CUMMULATIVE_ACKS == ac->num_acks)
5991  {
5992  /* must run immediately, ack buffer full! */
5994  }
5995  GNUNET_SCHEDULER_cancel (ac->task);
5996  ac->min_transmission_time =
5997  GNUNET_TIME_absolute_min (ac->min_transmission_time, max_delay);
5998  }
5999  GNUNET_assert (ac->num_acks < MAX_CUMMULATIVE_ACKS);
6000  ac->ack_uuids[ac->num_acks].receive_time = GNUNET_TIME_absolute_get ();
6001  ac->ack_uuids[ac->num_acks].ack_uuid = *ack_uuid;
6002  ac->num_acks++;
6003  ac->task = GNUNET_SCHEDULER_add_at (ac->min_transmission_time,
6005  ac);
6006 }
6007 
6008 
6013 {
6017  struct MessageUUIDP message_uuid;
6018 
6023 };
6024 
6025 
6035 static int
6036 find_by_message_uuid (void *cls, uint32_t key, void *value)
6037 {
6038  struct FindByMessageUuidContext *fc = cls;
6039  struct ReassemblyContext *rc = value;
6040 
6041  (void) key;
6042  if (0 == GNUNET_memcmp (&fc->message_uuid, &rc->msg_uuid))
6043  {
6044  fc->rc = rc;
6045  return GNUNET_NO;
6046  }
6047  return GNUNET_YES;
6048 }
6049 
6050 
6058 static void
6060 {
6061  struct CommunicatorMessageContext *cmc = cls;
6062  struct VirtualLink *vl;
6063  struct ReassemblyContext *rc;
6064  const struct GNUNET_MessageHeader *msg;
6065  uint16_t msize;
6066  uint16_t fsize;
6067  uint16_t frag_off;
6068  char *target;
6069  struct GNUNET_TIME_Relative cdelay;
6070  struct FindByMessageUuidContext fc;
6071 
6072  vl = lookup_virtual_link (&cmc->im.sender);
6073  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
6074  {
6075  struct GNUNET_SERVICE_Client *client = cmc->tc->client;
6076 
6078  "No virtual link for %s to handle fragment\n",
6079  GNUNET_i2s (&cmc->im.sender));
6080  GNUNET_break (0);
6081  finish_cmc_handling (cmc);
6082  GNUNET_SERVICE_client_drop (client);
6083  return;
6084  }
6085  if (NULL == vl->reassembly_map)
6086  {
6088  vl->reassembly_heap =
6093  vl);
6094  }
6095  msize = ntohs (fb->msg_size);
6096  fc.message_uuid = fb->msg_uuid;
6097  fc.rc = NULL;
6099  fb->msg_uuid.uuid,
6101  &fc);
6102  fsize = ntohs (fb->header.size) - sizeof(*fb);
6103  if (NULL == (rc = fc.rc))
6104  {
6105  rc = GNUNET_malloc (sizeof(*rc) + msize /* reassembly payload buffer */
6106  + (msize + 7) / 8 * sizeof(uint8_t) /* bitfield */);
6107  rc->msg_uuid = fb->msg_uuid;
6108  rc->virtual_link = vl;
6109  rc->msg_size = msize;
6110  rc->reassembly_timeout =
6114  rc,
6118  vl->reassembly_map,
6119  rc->msg_uuid.uuid,
6120  rc,
6122  target = (char *) &rc[1];
6123  rc->bitfield = (uint8_t *) (target + rc->msg_size);
6124  if (fsize != rc->msg_size)
6125  rc->msg_missing = rc->msg_size;
6126  else
6127  rc->msg_missing = 0;
6129  "Received fragment with size %u at offset %u/%u %u bytes missing from %s for NEW message %u\n",
6130  fsize,
6131  ntohs (fb->frag_off),
6132  msize,
6133  rc->msg_missing,
6134  GNUNET_i2s (&cmc->im.sender),
6135  (unsigned int) fb->msg_uuid.uuid);
6136  }
6137  else
6138  {
6139  target = (char *) &rc[1];
6141  "Received fragment at offset %u/%u from %s for message %u\n",
6142  ntohs (fb->frag_off),
6143  msize,
6144  GNUNET_i2s (&cmc->im.sender),
6145  (unsigned int) fb->msg_uuid.uuid);
6146  }
6147  if (msize != rc->msg_size)
6148  {
6149  GNUNET_break (0);
6150  finish_cmc_handling (cmc);
6151  return;
6152  }
6153 
6154  /* reassemble */
6155  if (0 == fsize)
6156  {
6157  GNUNET_break (0);
6158  finish_cmc_handling (cmc);
6159  return;
6160  }
6161  frag_off = ntohs (fb->frag_off);
6162  if (frag_off + fsize > msize)
6163  {
6164  /* Fragment (plus fragment size) exceeds message size! */
6165  GNUNET_break_op (0);
6166  finish_cmc_handling (cmc);
6167  return;
6168  }
6169  memcpy (&target[frag_off], &fb[1], fsize);
6170  /* update bitfield and msg_missing */
6171  for (unsigned int i = frag_off; i < frag_off + fsize; i++)
6172  {
6173  if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
6174  {
6175  rc->bitfield[i / 8] |= (1 << (i % 8));
6176  rc->msg_missing--;
6177  }
6178  }
6179 
6180  /* Compute cumulative ACK */
6182  cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->msg_missing / fsize);
6183  if (0 == rc->msg_missing)
6184  cdelay = GNUNET_TIME_UNIT_ZERO;
6185  cummulative_ack (&cmc->im.sender,
6186  &fb->ack_uuid,
6189  /* is reassembly complete? */
6190  if (0 != rc->msg_missing)
6191  {
6192  finish_cmc_handling (cmc);
6193  return;
6194  }
6195  /* reassembly is complete, verify result */
6196  msg = (const struct GNUNET_MessageHeader *) &rc[1];
6197  if (ntohs (msg->size) != rc->msg_size)
6198  {
6199  GNUNET_break (0);
6201  finish_cmc_handling (cmc);
6202  return;
6203  }
6204  /* successful reassembly */
6206  "Fragment reassembly complete for message %u\n",
6207  (unsigned int) fb->msg_uuid.uuid);
6208  /* FIXME: check that the resulting msg is NOT a
6209  DV Box or Reliability Box, as that is NOT allowed! */
6210  cmc->mh = msg;
6211  demultiplex_with_cmc (cmc);
6212  /* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still
6213  en-route and we forget that we finished this reassembly immediately!
6214  -> keep around until timeout?
6215  -> shorten timeout based on ACK? */
6217 }
6218 
6219 
6227 static int
6229  const struct TransportReliabilityBoxMessage *rb)
6230 {
6231  (void) cls;
6232  const struct GNUNET_MessageHeader *inbox = (const struct
6233  GNUNET_MessageHeader *) &rb[1];
6234 
6236  "check_send_msg with size %u: inner msg type %u and size %u (%lu %lu)\n",
6237  ntohs (rb->header.size),
6238  ntohs (inbox->type),
6239  ntohs (inbox->size),
6240  sizeof (struct TransportReliabilityBoxMessage),
6241  sizeof (struct GNUNET_MessageHeader));
6243  return GNUNET_YES;
6244 }
6245 
6246 
6254 static void
6256  const struct TransportReliabilityBoxMessage *rb)
6257 {
6258  struct CommunicatorMessageContext *cmc = cls;
6259  const struct GNUNET_MessageHeader *inbox =
6260  (const struct GNUNET_MessageHeader *) &rb[1];
6261  struct GNUNET_TIME_Relative rtt;
6262 
6264  "Received reliability box from %s with UUID %s of type %u\n",
6265  GNUNET_i2s (&cmc->im.sender),
6266  GNUNET_uuid2s (&rb->ack_uuid.value),
6267  (unsigned int) ntohs (inbox->type));
6268  rtt = GNUNET_TIME_UNIT_SECONDS; /* FIXME: should base this on "RTT", but we
6269  do not really have an RTT for the
6270  * incoming* queue (should we have
6271  the sender add it to the rb message?) */
6272  cummulative_ack (
6273  &cmc->im.sender,
6274  &rb->ack_uuid,
6275  (0 == ntohl (rb->ack_countdown))
6278  GNUNET_TIME_relative_divide (rtt, 8 /* FIXME: magic constant */)));
6279  /* continue with inner message */
6280  /* FIXME: check that inbox is NOT a DV Box, fragment or another
6281  reliability box (not allowed!) */
6282  cmc->mh = inbox;
6283  demultiplex_with_cmc (cmc);
6284 }
6285 
6286 
6295 static void
6296 update_pd_age (struct PerformanceData *pd, unsigned int age)
6297 {
6298  unsigned int sage;
6299 
6300  if (age == pd->last_age)
6301  return; /* nothing to do */
6302  sage = GNUNET_MAX (pd->last_age, age - 2 * GOODPUT_AGING_SLOTS);
6303  for (unsigned int i = sage; i <= age - GOODPUT_AGING_SLOTS; i++)
6304  {
6305  struct TransmissionHistoryEntry *the = &pd->the[i % GOODPUT_AGING_SLOTS];
6306 
6307  the->bytes_sent = 0;
6308  the->bytes_received = 0;
6309  }
6310  pd->last_age = age;
6311 }
6312 
6313 
6322 static void
6324  struct GNUNET_TIME_Relative rtt,
6325  uint16_t bytes_transmitted_ok)
6326 {
6327  uint64_t nval = rtt.rel_value_us;
6328  uint64_t oval = pd->aged_rtt.rel_value_us;
6329  unsigned int age = get_age ();
6330  struct TransmissionHistoryEntry *the = &pd->the[age % GOODPUT_AGING_SLOTS];
6331 
6332  if (oval == GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us)
6333  pd->aged_rtt = rtt;
6334  else
6335  pd->aged_rtt.rel_value_us = (nval + 7 * oval) / 8;
6336  update_pd_age (pd, age);
6337  the->bytes_received += bytes_transmitted_ok;
6338 }
6339 
6340 
6348 static void
6350  struct GNUNET_TIME_Relative rtt,
6351  uint16_t bytes_transmitted_ok)
6352 {
6353  update_performance_data (&q->pd, rtt, bytes_transmitted_ok);
6354 }
6355 
6356 
6364 static void
6366  struct GNUNET_TIME_Relative rtt,
6367  uint16_t bytes_transmitted_ok)
6368 {
6369  update_performance_data (&dvh->pd, rtt, bytes_transmitted_ok);
6370 }
6371 
6372 
6380 static void
6382 {
6383  struct PendingMessage *pos;
6384 
6386  "Complete transmission of message %llu %u\n",
6387  pm->logging_uuid,
6388  pm->pmt);
6389  switch (pm->pmt)
6390  {
6391  case PMT_CORE:
6392  case PMT_RELIABILITY_BOX:
6393  /* Full message sent, we are done */
6395  return;
6396 
6397  case PMT_FRAGMENT_BOX:
6398  /* Fragment sent over reliable channel */
6399  pos = pm->frag_parent;
6400  GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
6402  /* check if subtree is done */
6403  while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
6404  (NULL != pos->frag_parent))
6405  {
6406  pm = pos;
6407  pos = pm->frag_parent;
6408  if ((NULL == pos) && (PMT_DV_BOX == pm->pmt))
6409  {
6411  return;
6412  }
6413  else if (PMT_DV_BOX == pm->pmt)
6414  {
6415  client_send_response (pos);
6416  return;
6417  }
6418  GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
6420  }
6421 
6422  /* Was this the last applicable fragment? */
6423  if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) &&
6424  (pos->frag_off == pos->bytes_msg))
6425  client_send_response (pos);
6426  return;
6427 
6428  case PMT_DV_BOX:
6430  "Completed transmission of message %llu (DV Box)\n",
6431  pm->logging_uuid);
6432  if (NULL != pm->frag_parent)
6433  {
6434  if (NULL != pm->bpm)
6435  {
6436  GNUNET_free (pm->bpm);
6438  "Freed bpm\n");
6439  }
6440  pos = pm->frag_parent;
6442  pos->bpm = NULL;
6443  client_send_response (pos);
6444  }
6445  else
6447  return;
6448  }
6449 }
6450 
6451 
6459 static void
6461  struct GNUNET_TIME_Relative ack_delay)
6462 {
6463  struct GNUNET_TIME_Relative delay;
6464 
6466  if (delay.rel_value_us > ack_delay.rel_value_us)
6468  else
6469  delay = GNUNET_TIME_relative_subtract (delay, ack_delay);
6470  if (NULL != pa->queue)
6472  if (NULL != pa->dvh)
6474  if (NULL != pa->pm)
6477 }
6478 
6479 
6487 static int
6489  const struct TransportReliabilityAckMessage *ra)
6490 {
6491  unsigned int n_acks;
6492 
6493  (void) cls;
6494  n_acks = (ntohs (ra->header.size) - sizeof(*ra))
6495  / sizeof(struct TransportCummulativeAckPayloadP);
6496  if (0 == n_acks)
6497  {
6498  GNUNET_break_op (0);
6499  return GNUNET_SYSERR;
6500  }
6501  if ((ntohs (ra->header.size) - sizeof(*ra)) !=
6502  n_acks * sizeof(struct TransportCummulativeAckPayloadP))
6503  {
6504  GNUNET_break_op (0);
6505  return GNUNET_SYSERR;
6506  }
6507  return GNUNET_OK;
6508 }
6509 
6510 
6518 static void
6520  const struct TransportReliabilityAckMessage *ra)
6521 {
6522  struct CommunicatorMessageContext *cmc = cls;
6523  const struct TransportCummulativeAckPayloadP *ack;
6524  unsigned int n_acks;
6525  uint32_t ack_counter;
6526 
6527  n_acks = (ntohs (ra->header.size) - sizeof(*ra))
6528  / sizeof(struct TransportCummulativeAckPayloadP);
6529  ack = (const struct TransportCummulativeAckPayloadP *) &ra[1];
6530  for (unsigned int i = 0; i < n_acks; i++)
6531  {
6532  struct PendingAcknowledgement *pa =
6534  if (NULL == pa)
6535  {
6537  "Received ACK from %s with UUID %s which is unknown to us!\n",
6538  GNUNET_i2s (&cmc->im.sender),
6539  GNUNET_uuid2s (&ack[i].ack_uuid.value));
6541  GST_stats,
6542  "# FRAGMENT_ACKS dropped, no matching pending message",
6543  1,
6544  GNUNET_NO);
6545  continue;
6546  }
6548  "Received ACK from %s with UUID %s\n",
6549  GNUNET_i2s (&cmc->im.sender),
6550  GNUNET_uuid2s (&ack[i].ack_uuid.value));
6551  handle_acknowledged (pa, GNUNET_TIME_relative_ntoh (ack[i].ack_delay));
6552  }
6553 
6554  ack_counter = htonl (ra->ack_counter);
6555  (void) ack_counter; /* silence compiler warning for now */
6556  // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere!
6557  // (DV and/or Neighbour?)
6558  finish_cmc_handling (cmc);
6559 }
6560 
6561 
6569 static int
6571  void *cls,
6573 {
6574  uint16_t size = ntohs (be->header.size) - sizeof(*be);
6575  const struct GNUNET_MessageHeader *inbox =
6576  (const struct GNUNET_MessageHeader *) &be[1];
6577  const char *is;
6578  uint16_t isize;
6579 
6580  (void) cls;
6581  if (ntohs (inbox->size) >= size)
6582  {
6583  GNUNET_break_op (0);
6584  return GNUNET_SYSERR;
6585  }
6586  isize = ntohs (inbox->size);
6587  is = ((const char *) inbox) + isize;
6588  size -= isize;
6589  if ('\0' != is[size - 1])
6590  {
6591  GNUNET_break_op (0);
6592  return GNUNET_SYSERR;
6593  }
6594  return GNUNET_YES;
6595 }
6596 
6597 
6606 static void
6608  void *cls,
6610 {
6611  struct CommunicatorMessageContext *cmc = cls;
6612  struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
6613  struct GNUNET_MQ_Envelope *env;
6614  struct TransportClient *tc;
6615  const struct GNUNET_MessageHeader *inbox =
6616  (const struct GNUNET_MessageHeader *) &be[1];
6617  uint16_t isize = ntohs (inbox->size);
6618  const char *target_communicator = ((const char *) inbox) + isize;
6619  char *sender;
6620  char *self;
6621 
6622  GNUNET_asprintf (&sender,
6623  "%s",
6624  GNUNET_i2s (&cmc->im.sender));
6625  GNUNET_asprintf (&self,
6626  "%s",
6628 
6629  /* Find client providing this communicator */
6630  for (tc = clients_head; NULL != tc; tc = tc->next)
6631  if ((CT_COMMUNICATOR == tc->type) &&
6632  (0 ==
6633  strcmp (tc->details.communicator.address_prefix, target_communicator)))
6634  break;
6635  if (NULL == tc)
6636  {
6637  char *stastr;
6638 
6639  GNUNET_asprintf (
6640  &stastr,
6641  "# Backchannel message dropped: target communicator `%s' unknown",
6642  target_communicator);
6644  GNUNET_free (stastr);
6645  finish_cmc_handling (cmc);
6646  return;
6647  }
6648  /* Finally, deliver backchannel message to communicator */
6650  "Delivering backchannel message from %s to %s of type %u to %s\n",
6651  sender,
6652  self,
6653  ntohs (inbox->type),
6654  target_communicator);
6656  cbi,
6657  isize,
6659  cbi->pid = cmc->im.sender;
6660  memcpy (&cbi[1], inbox, isize);
6661  GNUNET_MQ_send (tc->mq, env);
6662  finish_cmc_handling (cmc);
6663 }
6664 
6665 
6675 static void
6676 path_cleanup_cb (void *cls)
6677 {
6678  struct DistanceVector *dv = cls;
6679  struct DistanceVectorHop *pos;
6680 
6681  dv->timeout_task = NULL;
6682  while (NULL != (pos = dv->dv_head))
6683  {
6684  GNUNET_assert (dv == pos->dv);
6686  break;
6688  }
6689  if (NULL == pos)
6690  {
6691  free_dv_route (dv);
6692  return;
6693  }
6694  dv->timeout_task =
6696 }
6697 
6698 
6699 static void send_msg_from_cache (struct VirtualLink *vl)
6700 {
6701 
6702  const struct GNUNET_PeerIdentity target = vl->target;
6703 
6704 
6706  {
6707  struct RingBufferEntry *ring_buffer_copy[RING_BUFFER_SIZE];
6708  unsigned int tail = GNUNET_YES == is_ring_buffer_full ? ring_buffer_head :
6709  0;
6710  unsigned int head = GNUNET_YES == is_ring_buffer_full ? RING_BUFFER_SIZE :
6712  struct GNUNET_TRANSPORT_IncomingMessage im;
6713  struct CommunicatorMessageContext *cmc;
6714  struct RingBufferEntry *rbe;
6715  struct GNUNET_MessageHeader *mh;
6716 
6718  "Sending from ring buffer, which has %u items\n",
6720 
6721  ring_buffer_head = 0;
6722  for (unsigned int i = 0; i < head; i++)
6723  {
6724  rbe = ring_buffer[(i + tail) % RING_BUFFER_SIZE];
6725  cmc = rbe->cmc;
6726  mh = rbe->mh;
6727 
6728  im = cmc->im;
6729  // mh = cmc->mh;
6731  "Sending to ring buffer target %s using vl target %s\n",
6732  GNUNET_i2s (&im.sender),
6733  GNUNET_i2s2 (&target));
6734  if (0 == GNUNET_memcmp (&target, &im.sender))
6735  {
6737  "Finish handling message of type %u and size %u\n",
6738  (unsigned int) ntohs (mh->type),
6739  (unsigned int) ntohs (mh->size));
6741  GNUNET_free (mh);
6742  }
6743  else
6744  {
6745  ring_buffer_copy[i] = rbe;
6746  ring_buffer_head++;
6747  }
6748  }
6749 
6752  {
6754  }
6755 
6756  for (unsigned int i = 0; i < ring_buffer_head; i++)
6757  {
6758  ring_buffer[i] = ring_buffer_copy[i];
6760  "ring_buffer_copy[i]->mh->type for i %u %u\n",
6761  i,
6762  ring_buffer_copy[i]->mh->type);
6764  "ring_buffer[i]->mh->type for i %u %u\n",
6765  i,
6766  ring_buffer[i]->mh->type);
6767  }
6768 
6770  "%u items still in ring buffer\n",
6772  }
6773 
6775  {
6776  struct PendingMessage *ring_buffer_dv_copy[RING_BUFFER_SIZE];
6777  struct PendingMessage *pm;
6778  unsigned int tail = GNUNET_YES == is_ring_buffer_dv_full ?
6780  0;
6781  unsigned int head = GNUNET_YES == is_ring_buffer_dv_full ?
6784 
6786  "Sending from ring buffer dv, which has %u items\n",
6788 
6789  ring_buffer_dv_head = 0;
6790  for (unsigned int i = 0; i < head; i++)
6791  {
6793 
6795  "Sending to ring buffer target %s using vl target %s\n",
6796  GNUNET_i2s (&pm->target),
6797  GNUNET_i2s2 (&target));
6798  if (0 == GNUNET_memcmp (&target, &pm->target))
6799  {
6801  "Adding PendingMessage to vl, checking transmission.\n");
6802  pm->vl = vl;
6806  pm);
6807 
6809  }
6810  else
6811  {
6812  ring_buffer_dv_copy[i] = pm;
6814  }
6815  }
6816 
6818  {
6820  }
6821 
6822  for (unsigned int i = 0; i < ring_buffer_dv_head; i++)
6823  ring_buffer_dv[i] = ring_buffer_dv_copy[i];
6824 
6826  "%u items still in ring buffer dv.\n",
6828 
6829  }
6830 }
6831 
6832 
6840 static void
6842 {
6843  struct DistanceVector *dv = hop->dv;
6844  struct VirtualLink *vl;
6845 
6846  vl = lookup_virtual_link (&dv->target);
6847  if (NULL == vl)
6848  {
6849 
6850  vl = GNUNET_new (struct VirtualLink);
6852  "Creating new virtual link %p to %s using DV!\n",
6853  vl,
6854  GNUNET_i2s (&dv->target));
6855  vl->confirmed = GNUNET_YES;
6856  vl->message_uuid_ctr =
6858  vl->target = dv->target;
6864  links,
6865  &vl->target,
6866  vl,
6868  vl->dv = dv;
6869  dv->vl = vl;
6870  vl->visibility_task =
6872  consider_sending_fc (vl);
6873  /* We lacked a confirmed connection to the target
6874  before, so tell CORE about it (finally!) */
6876  send_msg_from_cache (vl);
6877  }
6878  else
6879  {
6880  /* Link was already up, remember dv is also now available and we are done */
6881  vl->dv = dv;
6882  dv->vl = vl;
6883  if (GNUNET_NO == vl->confirmed)
6884  {
6885  vl->confirmed = GNUNET_YES;
6886  vl->visibility_task =
6888  consider_sending_fc (vl);
6889  /* We lacked a confirmed connection to the target
6890  before, so tell CORE about it (finally!) */
6892  send_msg_from_cache (vl);
6893  }
6894  else
6896  "Virtual link to %s could now also use DV!\n",
6897  GNUNET_i2s (&dv->target));
6898  }
6899 }
6900 
6901 
6927 static int
6929  unsigned int path_len,
6930  struct GNUNET_TIME_Relative network_latency,
6931  struct GNUNET_TIME_Absolute path_valid_until)
6932 {
6933  struct DistanceVectorHop *hop;
6934  struct DistanceVector *dv;
6935  struct Neighbour *next_hop;
6936  unsigned int shorter_distance;
6937 
6938  if (path_len < 3)
6939  {
6940  /* what a boring path! not allowed! */
6941  GNUNET_break (0);
6942  return GNUNET_SYSERR;
6943  }
6944  GNUNET_assert (0 == GNUNET_memcmp (&GST_my_identity, &path[0]));
6945  next_hop = lookup_neighbour (&path[1]);
6946  if (NULL == next_hop)
6947  {
6948  /* next hop must be a neighbour, otherwise this whole thing is useless! */
6949  GNUNET_break (0);
6950  return GNUNET_SYSERR;
6951  }
6952  for (unsigned int i = 2; i < path_len; i++)
6953  if (NULL != lookup_neighbour (&path[i]))
6954  {
6955  /* Useless path: we have a direct connection to some hop
6956  in the middle of the path, so this one is not even
6957  terribly useful for redundancy */
6959  "Path of %u hops useless: directly link to hop %u (%s)\n",
6960  path_len,
6961  i,
6962  GNUNET_i2s (&path[i]));
6964  "# Useless DV path ignored: hop is neighbour",
6965  1,
6966  GNUNET_NO);
6967  return GNUNET_SYSERR;
6968  }
6969  dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &path[path_len - 1]);
6970  if (NULL == dv)
6971  {
6972  dv = GNUNET_new (struct DistanceVector);
6973  dv->target = path[path_len - 1];
6975  &path_cleanup_cb,
6976  dv);
6979  dv_routes,
6980  &dv->target,
6981  dv,
6983  }
6984  /* Check if we have this path already! */
6985  shorter_distance = 0;
6986  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
6987  pos = pos->next_dv)
6988  {
6989  if (pos->distance < path_len - 3)
6990  shorter_distance++;
6991  /* Note that the distances in 'pos' excludes us (path[0]),
6992  the next_hop (path[1]) and the target so we need to subtract three
6993  and check next_hop explicitly */
6994  if ((pos->distance == path_len - 3) && (pos->next_hop == next_hop))
6995  {
6996  int match = GNUNET_YES;
6997 
6998  for (unsigned int i = 0; i < pos->distance; i++)
6999  {
7000  if (0 != GNUNET_memcmp (&pos->path[i], &path[i + 2]))
7001  {
7002  match = GNUNET_NO;
7003  break;
7004  }
7005  }
7006  if (GNUNET_YES == match)
7007  {
7008  struct GNUNET_TIME_Relative last_timeout;
7009 
7010  /* Re-discovered known path, update timeout */
7012  "# Known DV path refreshed",
7013  1,
7014  GNUNET_NO);
7015  last_timeout = GNUNET_TIME_absolute_get_remaining (pos->timeout);
7016  pos->timeout =
7018  pos->path_valid_until =
7019  GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until);
7020  GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos);
7021  GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos);
7022  if (0 <
7025  if (last_timeout.rel_value_us <
7028  .rel_value_us)
7029  {
7030  /* Some peer send DV learn messages too often, we are learning
7031  the same path faster than it would be useful; do not forward! */
7033  "Rediscovered path too quickly, not forwarding further\n");
7034  return GNUNET_NO;
7035  }
7037  "Refreshed known path to %s valid until %s, forwarding further\n",
7038  GNUNET_i2s (&dv->target),
7040  pos->path_valid_until));
7041  return GNUNET_YES;
7042  }
7043  }
7044  }
7045  /* Count how many shorter paths we have (incl. direct
7046  neighbours) before simply giving up on this one! */
7047  if (shorter_distance >= MAX_DV_PATHS_TO_TARGET)
7048  {
7049  /* We have a shorter path already! */
7051  "Have many shorter DV paths %s, not forwarding further\n",
7052  GNUNET_i2s (&dv->target));
7053  return GNUNET_NO;
7054  }
7055  /* create new DV path entry */
7057  "Discovered new DV path to %s valid until %s\n",
7058  GNUNET_i2s (&dv->target),
7059  GNUNET_STRINGS_absolute_time_to_string (path_valid_until));
7060  hop = GNUNET_malloc (sizeof(struct DistanceVectorHop)
7061  + sizeof(struct GNUNET_PeerIdentity) * (path_len - 3));
7062  hop->next_hop = next_hop;
7063  hop->dv = dv;
7064  hop->path = (const struct GNUNET_PeerIdentity *) &hop[1];
7065  memcpy (&hop[1],
7066  &path[2],
7067  sizeof(struct GNUNET_PeerIdentity) * (path_len - 3));
7069  hop->path_valid_until = path_valid_until;
7070  hop->distance = path_len - 3;
7071  hop->pd.aged_rtt = network_latency;
7072  GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, hop);
7073  GNUNET_CONTAINER_MDLL_insert (neighbour,
7074  next_hop->dv_head,
7075  next_hop->dv_tail,
7076  hop);
7077  if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
7079  return GNUNET_YES;
7080 }
7081 
7082 
7090 static int
7091 check_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
7092 {
7093  uint16_t size = ntohs (dvl->header.size);
7094  uint16_t num_hops = ntohs (dvl->num_hops);
7095  const struct DVPathEntryP *hops = (const struct DVPathEntryP *) &dvl[1];
7096 
7097  (void) cls;
7098  if (size != sizeof(*dvl) + num_hops * sizeof(struct DVPathEntryP))
7099  {
7100  GNUNET_break_op (0);
7101  return GNUNET_SYSERR;
7102  }
7103  if (num_hops > MAX_DV_HOPS_ALLOWED)
7104  {
7105  GNUNET_break_op (0);
7106  return GNUNET_SYSERR;
7107  }
7108  for (unsigned int i = 0; i < num_hops; i++)
7109  {
7110  if (0 == GNUNET_memcmp (&dvl->initiator, &hops[i].hop))
7111  {
7112  GNUNET_break_op (0);
7113  return GNUNET_SYSERR;
7114  }
7115  if (0 == GNUNET_memcmp (&GST_my_identity, &hops[i].hop))
7116  {
7117  GNUNET_break_op (0);
7118  return GNUNET_SYSERR;
7119  }
7120  }
7121  return GNUNET_YES;
7122 }
7123 
7124 
7136 static void
7137 forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
7138  const struct TransportDVLearnMessage *msg,
7139  uint16_t bi_history,
7140  uint16_t nhops,
7141  const struct DVPathEntryP *hops,
7142  struct GNUNET_TIME_Absolute in_time)
7143 {
7144  struct Neighbour *n;
7145  struct VirtualLink *vl;
7146  struct DVPathEntryP *dhops;
7147  char buf[sizeof(struct TransportDVLearnMessage)
7148  + (nhops + 1) * sizeof(struct DVPathEntryP)] GNUNET_ALIGN;
7149  struct TransportDVLearnMessage *fwd = (struct TransportDVLearnMessage *) buf;
7150  struct GNUNET_TIME_Relative nnd;
7151 
7152  /* compute message for forwarding */
7154  "Forwarding DV learn message originating from %s to %s\n",
7155  GNUNET_i2s (&msg->initiator),
7156  GNUNET_i2s2 (next_hop));
7159  fwd->header.size = htons (sizeof(struct TransportDVLearnMessage)
7160  + (nhops + 1) * sizeof(struct DVPathEntryP));
7161  fwd->num_hops = htons (nhops + 1);
7162  fwd->bidirectional = htons (bi_history);
7165  msg->non_network_delay));
7167  fwd->init_sig = msg->init_sig;
7168  fwd->initiator = msg->initiator;
7169  fwd->challenge = msg->challenge;
7170  fwd->monotonic_time = msg->monotonic_time;
7171  dhops = (struct DVPathEntryP *) &fwd[1];
7172  GNUNET_memcpy (dhops, hops, sizeof(struct DVPathEntryP) * nhops);
7173  dhops[nhops].hop = GST_my_identity;
7174  {
7175  struct DvHopPS dhp = {
7177  .purpose.size = htonl (sizeof(dhp)),
7178  .pred = (0 == nhops) ? msg->initiator : dhops[nhops - 1].hop,
7179  .succ = *next_hop,
7180  .challenge = msg->challenge
7181  };
7183  &dhp,
7184  &dhops[nhops].hop_sig);
7185  }
7186  /*route_control_message_without_fc (next_hop,
7187  &fwd->header,
7188  RMO_UNCONFIRMED_ALLOWED);*/
7189  vl = lookup_virtual_link (next_hop);
7190  if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
7191  {
7193  &fwd->header,
7195  }
7196  else
7197  {
7198  /* Use route via neighbour */
7199  n = lookup_neighbour (next_hop);
7200  if (NULL != n)
7202  n,
7203  &fwd->header,
7205  }
7206 }
7207 
7208 
7218 static int
7220  struct GNUNET_TIME_AbsoluteNBO sender_monotonic_time,
7221  const struct GNUNET_PeerIdentity *init,
7223  const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
7224 {
7225  struct DvInitPS ip = { .purpose.purpose = htonl (
7227  .purpose.size = htonl (sizeof(ip)),
7228  .monotonic_time = sender_monotonic_time,
7229  .challenge = *challenge };
7230 
7231  if (
7232  GNUNET_OK !=
7234  &ip,
7235  init_sig,
7236  &init->public_key))
7237  {
7238  GNUNET_break_op (0);
7239  return GNUNET_SYSERR;
7240  }
7241  return GNUNET_OK;
7242 }
7243 
7244 
7249 {
7254 
7258  const struct DVPathEntryP *hops;
7259 
7264 
7269 
7273  unsigned int num_eligible;
7274 
7278  unsigned int num_selections;
7279 
7283  uint16_t nhops;
7284 
7288  uint16_t bi_history;
7289 };
7290 
7291 
7300 static int
7302  const struct GNUNET_PeerIdentity *pid,
7303  void *value)
7304 {
7305  struct NeighbourSelectionContext *nsc = cls;
7306 
7307  (void) value;
7308  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
7309  return GNUNET_YES; /* skip initiator */
7310  for (unsigned int i = 0; i < nsc->nhops; i++)
7311  if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
7312  return GNUNET_YES;
7313  /* skip peers on path */
7314  nsc->num_eligible++;
7315  return GNUNET_YES;
7316 }
7317 
7318 
7329 static int
7331  const struct GNUNET_PeerIdentity *pid,
7332  void *value)
7333 {
7334  struct NeighbourSelectionContext *nsc = cls;
7335 
7337  "transmission %s\n",
7338  GNUNET_i2s (pid));
7339  (void) value;
7340  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
7341  return GNUNET_YES; /* skip initiator */
7342  for (unsigned int i = 0; i < nsc->nhops; i++)
7343  if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
7344  return GNUNET_YES;
7345  /* skip peers on path */
7346  for (unsigned int i = 0; i < nsc->num_selections; i++)
7347  {
7348  if (nsc->selections[i] == nsc->num_eligible)
7349  {
7351  nsc->dvl,
7352  nsc->bi_history,
7353  nsc->nhops,
7354  nsc->hops,
7355  nsc->in_time);
7356  break;
7357  }
7358  }
7359  nsc->num_eligible++;
7360  return GNUNET_YES;
7361 }
7362 
7363 
7407 static unsigned int
7408 calculate_fork_degree (unsigned int hops_taken,
7409  unsigned int neighbour_count,
7410  unsigned int eligible_count)
7411 {
7412  double target_total = 50.0; /* FIXME: use LOG(NSE)? */
7413  double eligible_ratio =
7414  ((double) eligible_count) / ((double) neighbour_count);
7415  double boost_factor = eligible_ratio * eligible_ratio;
7416  unsigned int rnd;
7417  double left;
7418 
7419  if (hops_taken >= 64)
7420  {
7421  GNUNET_break (0);
7422  return 0; /* precaution given bitshift below */
7423  }
7424  for (unsigned int i = 1; i < hops_taken; i++)
7425  {
7426  /* For each hop, subtract the expected number of targets
7427  reached at distance d (so what remains divided by 2^d) */
7428  target_total -= (target_total * boost_factor / (1LLU << i));
7429  }
7430  rnd =
7431  (unsigned int) floor (target_total * boost_factor / (1LLU << hops_taken));
7432  /* round up or down probabilistically depending on how close we were
7433  when floor()ing to rnd */
7434  left = target_total - (double) rnd;
7435  if (UINT32_MAX * left >
7437  rnd++; /* round up */
7439  "Forwarding DV learn message of %u hops %u(/%u/%u) times\n",
7440  hops_taken,
7441  rnd,
7442  eligible_count,
7443  neighbour_count);
7444  return rnd;
7445 }
7446 
7447 
7454 static void
7455 neighbour_store_dvmono_cb (void *cls, int success)
7456 {
7457  struct Neighbour *n = cls;
7458 
7459  n->sc = NULL;
7460  if (GNUNET_YES != success)
7461  GNUNET_log (