GNUnet  0.19.5
gnunet-service-tng.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
75 #include "platform.h"
76 #include "gnunet_util_lib.h"
80 #include "gnunet_hello_lib.h"
81 #include "gnunet_signatures.h"
82 #include "transport.h"
83 
87 #define RING_BUFFER_SIZE 16
88 
92 #define MAX_FC_RETRANSMIT_COUNT 1000
93 
98 #define MAX_CUMMULATIVE_ACKS 64
99 
112 #define FC_NO_CHANGE_REPLY_PROBABILITY 8
113 
118 #define IN_PACKET_SIZE_WITHOUT_MTU 128
119 
124 #define GOODPUT_AGING_SLOTS 4
125 
130 #define DEFAULT_WINDOW_SIZE (128 * 1024)
131 
140 #define MAX_INCOMING_REQUEST 16
141 
146 #define MAX_DV_DISCOVERY_SELECTION 16
147 
156 #define RECV_WINDOW_SIZE 4
157 
165 #define MIN_DV_PATH_LENGTH_FOR_INITIATOR 3
166 
170 #define MAX_DV_HOPS_ALLOWED 16
171 
176 #define MAX_DV_LEARN_PENDING 64
177 
181 #define MAX_DV_PATHS_TO_TARGET 3
182 
188 #define DELAY_WARN_THRESHOLD \
189  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
190 
195 #define DV_FORWARD_TIMEOUT \
196  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
197 
201 #define DEFAULT_ACK_WAIT_DURATION \
202  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
203 
209 #define DV_QUALITY_RTT_THRESHOLD \
210  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
211 
216 #define DV_PATH_VALIDITY_TIMEOUT \
217  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
218 
223 #define BACKCHANNEL_INACTIVITY_TIMEOUT \
224  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
225 
230 #define DV_PATH_DISCOVERY_FREQUENCY \
231  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
232 
236 #define EPHEMERAL_VALIDITY \
237  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
238 
242 #define REASSEMBLY_EXPIRATION \
243  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
244 
249 #define FAST_VALIDATION_CHALLENGE_FREQ \
250  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
251 
255 #define MAX_VALIDATION_CHALLENGE_FREQ \
256  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
257 
263 #define ACK_CUMMULATOR_TIMEOUT \
264  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
265 
270 #define DV_LEARN_BASE_FREQUENCY GNUNET_TIME_UNIT_MINUTES
271 
276 #define DV_LEARN_QUALITY_THRESHOLD 100
277 
281 #define MAX_ADDRESS_VALID_UNTIL \
282  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MONTHS, 1)
283 
287 #define ADDRESS_VALIDATION_LIFETIME \
288  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
289 
296 #define MIN_DELAY_ADDRESS_VALIDATION GNUNET_TIME_UNIT_MILLISECONDS
297 
304 #define VALIDATION_RTT_BUFFER_FACTOR 3
305 
312 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
313 
319 #define QUEUE_LENGTH_LIMIT 32
320 
321 
323 
328 {
333  uint64_t uuid GNUNET_PACKED;
334 };
335 
336 
341 {
345  struct GNUNET_Uuid value;
346 };
347 
352 {
357 
358  /* Followed by *another* message header which is the message to
359  the communicator */
360 
361  /* Followed by a 0-terminated name of the communicator */
362 };
363 
364 
369 {
374 
390 
395 
401 };
402 
403 
409 {
414 
420 
432 
433  /* Followed by a `struct GNUNET_MessageHeader` with a message
434  for the target peer */
435 };
436 
437 
443 {
448 
456 
463 };
464 
465 
470 {
478 
483 };
484 
485 
494 {
499 
505 
506  /* followed by any number of `struct TransportCummulativeAckPayloadP`
507  messages providing ACKs */
508 };
509 
510 
515 {
520 
525 
530 
539 
544  struct MessageUUIDP msg_uuid;
545 };
546 
547 
565 struct DvInitPS
566 {
571 
585 
590 };
591 
592 
609 struct DvHopPS
610 {
615 
619  struct GNUNET_PeerIdentity pred;
620 
624  struct GNUNET_PeerIdentity succ;
625 
630 };
631 
632 
638 {
642  struct GNUNET_PeerIdentity hop;
643 
649 };
650 
651 
666 {
671 
677 
687 
694 
708 
714 
719 
724 
725  /* Followed by @e num_hops `struct DVPathEntryP` values,
726  excluding the initiator of the DV trace; the last entry is the
727  current sender; the current peer must not be included. */
728 };
729 
730 
754 {
759 
763  unsigned int without_fc;
764 
772 
779 
785 
791  struct GNUNET_ShortHashCode iv;
792 
798  struct GNUNET_HashCode hmac;
799 
806 
807  /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
808  excluding the @e origin and the current peer, the last must be
809  the ultimate target; if @e num_hops is zero, the receiver of this
810  message is the ultimate target. */
811 
812  /* Followed by encrypted, variable-size payload, which
813  must begin with a `struct TransportDVBoxPayloadP` */
814 
815  /* Followed by the actual message, which itself must not be a
816  a DV_LEARN or DV_BOX message! */
817 };
818 
819 
825 {
830 
835 
840 
846 };
847 
848 
854 {
859 
865 
870 };
871 
872 
878 {
883 
888 
894 
899 
904  struct GNUNET_TIME_AbsoluteNBO origin_time;
905 
910  struct GNUNET_TIME_RelativeNBO validity_duration;
911 };
912 
913 
923 {
928 
936  uint32_t seq GNUNET_PACKED;
937 
943 
950 
960 
970 };
971 
972 
974 
975 
980 {
984  CT_NONE = 0,
985 
989  CT_CORE = 1,
990 
995 
1000 
1004  CT_APPLICATION = 4
1005 };
1006 
1007 
1013 {
1018 
1023 
1028 
1033 
1039  RMO_REDUNDANT = 4
1040 };
1041 
1042 
1047 {
1052 
1057 
1062 
1068 };
1069 
1070 
1076 {
1080  uint64_t bytes_sent;
1081 
1086  uint64_t bytes_received;
1087 };
1088 
1089 
1094 {
1099 
1105 
1110  unsigned int last_age;
1111 };
1112 
1113 
1117 struct TransportClient;
1118 
1122 struct Neighbour;
1123 
1128 struct DistanceVector;
1129 
1134 struct Queue;
1135 
1139 struct PendingMessage;
1140 
1144 struct DistanceVectorHop;
1145 
1154 struct VirtualLink;
1155 
1156 
1162 {
1168 
1174 
1179 
1183  struct GNUNET_TRANSPORT_IncomingMessage im;
1184 
1188  const struct GNUNET_MessageHeader *mh;
1189 
1194  uint16_t total_hops;
1195 };
1196 
1197 
1202 {
1207 
1212 };
1213 
1214 
1219 {
1224 
1229 
1233  struct VirtualLink *vl;
1234 
1238  uint16_t size;
1239 
1246  uint16_t isize;
1247 };
1248 
1249 
1254 {
1259  struct MessageUUIDP msg_uuid;
1260 
1265 
1270 
1278  uint8_t *bitfield;
1279 
1284 
1290 
1294  uint16_t msg_size;
1295 
1300  uint16_t msg_missing;
1301 
1302  /* Followed by @e msg_size bytes of the (partially) defragmented original
1303  * message */
1304 
1305  /* Followed by @e bitfield data */
1306 };
1307 
1308 
1318 {
1322  struct GNUNET_PeerIdentity target;
1323 
1330 
1337 
1342 
1348 
1354 
1359 
1364 
1369 
1374 
1382 
1388 
1392  unsigned int fc_retransmit_count;
1393 
1398  unsigned int confirmed;
1399 
1403  struct Neighbour *n;
1404 
1409 
1416 
1423 
1432 
1438 
1444 
1453 
1461 
1468 
1477 
1490 
1496 
1503 
1514 
1519  uint32_t fc_seq_gen;
1520 
1526  uint32_t last_fc_seq;
1527 
1540 };
1541 
1542 
1547 {
1553 
1559 
1566 
1573 
1580 
1587 
1594 
1601 
1606 
1612 
1618 
1623  struct Queue *queue;
1624 
1629 
1633  uint16_t message_size;
1634 
1638  unsigned int num_send;
1639 };
1640 
1641 
1646 {
1651 
1656 
1661 
1666 
1671 
1676 
1681 
1686 
1692  const struct GNUNET_PeerIdentity *path;
1693 
1699 
1708 
1712  struct PerformanceData pd;
1713 
1719  unsigned int distance;
1720 };
1721 
1722 
1728 {
1732  struct GNUNET_PeerIdentity target;
1733 
1738 
1743 
1748 
1753  struct VirtualLink *vl;
1754 
1760 
1765 
1770 
1775 
1780 };
1781 
1782 
1792 struct QueueEntry
1793 {
1797  struct QueueEntry *next;
1798 
1802  struct QueueEntry *prev;
1803 
1807  struct Queue *queue;
1808 
1813 
1817  uint64_t mid;
1818 };
1819 
1820 
1825 struct Queue
1826 {
1831 
1836 
1841 
1846 
1851 
1856 
1861 
1866 
1871 
1876 
1880  const char *address;
1881 
1885  unsigned int unlimited_length;
1886 
1892 
1901 
1905  struct PerformanceData pd;
1906 
1911  uint64_t mid_gen;
1912 
1916  uint32_t qid;
1917 
1921  uint32_t mtu;
1922 
1927 
1932 
1936  unsigned int queue_length;
1937 
1941  uint64_t q_capacity;
1942 
1946  uint32_t priority;
1947 
1951  enum GNUNET_NetworkType nt;
1952 
1957 
1962  int idle;
1963 };
1964 
1965 
1969 struct Neighbour
1970 {
1974  struct GNUNET_PeerIdentity pid;
1975 
1981 
1987 
1992 
1997 
2003 
2009 
2014  struct VirtualLink *vl;
2015 
2021 
2027 };
2028 
2029 
2035 {
2040 
2045 
2050 
2054  struct GNUNET_PeerIdentity pid;
2055 };
2056 
2057 
2061 struct PeerRequest
2062 {
2066  struct GNUNET_PeerIdentity pid;
2067 
2072 
2077 
2084 
2089 };
2090 
2091 
2096 {
2101 
2106 
2111 
2115  PMT_DV_BOX = 3
2116 };
2117 
2118 
2145 struct PendingMessage
2146 {
2151 
2156 
2161 
2166 
2172 
2178 
2183 
2188 
2194 
2199  struct VirtualLink *vl;
2200 
2204  struct GNUNET_PeerIdentity target;
2205 
2214  struct QueueEntry *qe;
2215 
2220 
2225 
2230 
2235 
2240 
2245 
2250  struct MessageUUIDP msg_uuid;
2251 
2256  unsigned long long logging_uuid;
2257 
2261  enum PendingMessageType pmt;
2262 
2268 
2273 
2277  uint16_t bytes_msg;
2278 
2282  uint16_t frag_off;
2283 
2287  unsigned int frags_in_flight;
2288 
2292  uint16_t frag_count;
2293 
2297  int16_t msg_uuid_set;
2298 
2299  /* Followed by @e bytes_msg to transmit */
2300 };
2301 
2302 
2307 {
2313 
2318 };
2319 
2320 
2326 {
2330  struct GNUNET_PeerIdentity target;
2331 
2336 
2343 
2348 
2354  uint32_t ack_counter;
2355 
2359  unsigned int num_acks;
2360 };
2361 
2362 
2367 {
2372 
2377 
2382 
2386  const char *address;
2387 
2392 
2397 
2403 
2407  uint32_t aid;
2408 
2412  enum GNUNET_NetworkType nt;
2413 };
2414 
2415 
2420 {
2425 
2430 
2435 
2440 
2444  enum ClientType type;
2445 
2446  union
2447  {
2451  struct
2452  {
2458 
2463  } core;
2464 
2468  struct
2469  {
2475  struct GNUNET_PeerIdentity peer;
2476 
2482 
2483 
2487  struct
2488  {
2494 
2499 
2504 
2510 
2516 
2522  unsigned int total_queue_length;
2523 
2529 
2533  struct
2534  {
2542 };
2543 
2544 
2550 {
2555  struct GNUNET_PeerIdentity pid;
2556 
2564 
2570 
2577  struct GNUNET_TIME_Absolute first_challenge_use;
2578 
2585  struct GNUNET_TIME_Absolute last_challenge_use;
2586 
2594  struct GNUNET_TIME_Absolute next_challenge;
2595 
2604  struct GNUNET_TIME_Relative challenge_backoff;
2605 
2610  struct GNUNET_TIME_Relative validation_rtt;
2611 
2619  struct GNUNET_CRYPTO_ChallengeNonceP challenge;
2620 
2624  char *address;
2625 
2631  struct GNUNET_CONTAINER_HeapNode *hn;
2632 
2638 
2644  uint32_t last_window_consum_limit;
2645 
2650  int awaiting_queue;
2651 };
2652 
2653 
2661 {
2665  struct GNUNET_PeerIdentity pid;
2666 
2671 
2676 
2681 
2687 
2692 
2698 
2704 
2709  size_t body_size;
2710 };
2711 
2712 
2717 
2721 static unsigned int ring_buffer_head;
2722 
2726 static unsigned int is_ring_buffer_full;
2727 
2732 
2736 static unsigned int ring_buffer_dv_head;
2737 
2741 static unsigned int is_ring_buffer_dv_full;
2742 
2747 
2752 
2757 
2762 
2766 static struct GNUNET_PeerIdentity GST_my_identity;
2767 
2772 
2778 
2784 
2790 
2796 
2802 
2808 
2814 
2819 
2823 static struct LearnLaunchEntry *lle_head = NULL;
2824 
2828 static struct LearnLaunchEntry *lle_tail = NULL;
2829 
2836 
2841 
2846 
2851 
2857 static struct IncomingRequest *ir_head;
2858 
2862 static struct IncomingRequest *ir_tail;
2863 
2867 static unsigned int ir_total;
2868 
2872 static unsigned long long logging_uuid_gen;
2873 
2883 
2888 static int in_shutdown;
2889 
2900 static unsigned int
2902 {
2903  struct GNUNET_TIME_Absolute now;
2904 
2905  now = GNUNET_TIME_absolute_get ();
2906  return now.abs_value_us / GNUNET_TIME_UNIT_MINUTES.rel_value_us / 15;
2907 }
2908 
2909 
2915 static void
2917 {
2919  GNUNET_assert (ir_total > 0);
2920  ir_total--;
2922  ir->wc = NULL;
2923  GNUNET_free (ir);
2924 }
2925 
2926 
2932 static void
2934 {
2935  struct Queue *q = pa->queue;
2936  struct PendingMessage *pm = pa->pm;
2937  struct DistanceVectorHop *dvh = pa->dvh;
2938 
2940  "free_pending_acknowledgement\n");
2941  if (NULL != q)
2942  {
2943  GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
2944  pa->queue = NULL;
2945  }
2946  if (NULL != pm)
2947  {
2949  "remove pa from message\n");
2951  "remove pa from message %llu\n",
2952  pm->logging_uuid);
2954  "remove pa from message %u\n",
2955  pm->pmt);
2957  "remove pa from message %s\n",
2958  GNUNET_uuid2s (&pa->ack_uuid.value));
2959  GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
2960  pa->pm = NULL;
2961  }
2962  if (NULL != dvh)
2963  {
2964  GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
2965  pa->queue = NULL;
2966  }
2969  &pa->ack_uuid.value,
2970  pa));
2971  GNUNET_free (pa);
2972 }
2973 
2974 
2983 static void
2985 {
2986  struct PendingMessage *frag;
2987 
2988  while (NULL != (frag = root->head_frag))
2989  {
2990  struct PendingAcknowledgement *pa;
2991 
2992  free_fragment_tree (frag);
2993  while (NULL != (pa = frag->pa_head))
2994  {
2995  GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
2996  pa->pm = NULL;
2997  }
2998  GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
2999  if (NULL != frag->qe)
3000  {
3001  GNUNET_assert (frag == frag->qe->pm);
3002  frag->qe->pm = NULL;
3003  }
3005  "Free frag %p\n",
3006  frag);
3007  GNUNET_free (frag);
3008  }
3009 }
3010 
3011 
3019 static void
3021 {
3022  struct TransportClient *tc = pm->client;
3023  struct VirtualLink *vl = pm->vl;
3024  struct PendingAcknowledgement *pa;
3025 
3027  "Freeing pm %p\n",
3028  pm);
3029  if (NULL != tc)
3030  {
3032  tc->details.core.pending_msg_head,
3033  tc->details.core.pending_msg_tail,
3034  pm);
3035  }
3036  if ((NULL != vl) && (NULL == pm->frag_parent))
3037  {
3039  "Removing pm %llu\n",
3040  pm->logging_uuid);
3042  vl->pending_msg_head,
3043  vl->pending_msg_tail,
3044  pm);
3045  }
3046  while (NULL != (pa = pm->pa_head))
3047  {
3048  if (NULL == pa)
3050  "free pending pa null\n");
3051  if (NULL == pm->pa_tail)
3053  "free pending pa_tail null\n");
3054  if (NULL == pa->prev_pa)
3056  "free pending pa prev null\n");
3057  if (NULL == pa->next_pa)
3059  "free pending pa next null\n");
3060  GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
3061  pa->pm = NULL;
3062  }
3063 
3065  if (NULL != pm->qe)
3066  {
3067  GNUNET_assert (pm == pm->qe->pm);
3068  pm->qe->pm = NULL;
3069  }
3070  if (NULL != pm->bpm)
3071  {
3072  free_fragment_tree (pm->bpm);
3073  GNUNET_free (pm->bpm);
3074  }
3075 
3076  GNUNET_free (pm);
3078  "Freeing pm done\n");
3079 }
3080 
3081 
3087 static void
3089 {
3090  struct VirtualLink *vl = rc->virtual_link;
3091 
3095  rc->msg_uuid.uuid,
3096  rc));
3097  GNUNET_free (rc);
3098 }
3099 
3100 
3106 static void
3108 {
3109  struct VirtualLink *vl = cls;
3110  struct ReassemblyContext *rc;
3111 
3112  vl->reassembly_timeout_task = NULL;
3113  while (NULL != (rc = GNUNET_CONTAINER_heap_peek (vl->reassembly_heap)))
3114  {
3116  .rel_value_us)
3117  {
3119  continue;
3120  }
3121  GNUNET_assert (NULL == vl->reassembly_timeout_task);
3125  vl);
3126  return;
3127  }
3128 }
3129 
3130 
3139 static int
3140 free_reassembly_cb (void *cls, uint32_t key, void *value)
3141 {
3142  struct ReassemblyContext *rc = value;
3143 
3144  (void) cls;
3145  (void) key;
3147  return GNUNET_OK;
3148 }
3149 
3150 
3156 static void
3158 {
3159  struct PendingMessage *pm;
3160  struct CoreSentContext *csc;
3161 
3163  "free virtual link %p\n",
3164  vl);
3165 
3166  if (NULL != vl->reassembly_map)
3167  {
3170  NULL);
3172  vl->reassembly_map = NULL;
3174  vl->reassembly_heap = NULL;
3175  }
3176  if (NULL != vl->reassembly_timeout_task)
3177  {
3179  vl->reassembly_timeout_task = NULL;
3180  }
3181  while (NULL != (pm = vl->pending_msg_head))
3185  if (NULL != vl->visibility_task)
3186  {
3188  vl->visibility_task = NULL;
3189  }
3190  if (NULL != vl->fc_retransmit_task)
3191  {
3193  vl->fc_retransmit_task = NULL;
3194  }
3195  while (NULL != (csc = vl->csc_head))
3196  {
3198  GNUNET_assert (vl == csc->vl);
3199  csc->vl = NULL;
3200  }
3201  GNUNET_break (NULL == vl->n);
3202  GNUNET_break (NULL == vl->dv);
3203  GNUNET_free (vl);
3204 }
3205 
3206 
3212 static void
3214 {
3215  GNUNET_assert (
3216  GNUNET_YES ==
3219  vs->hn = NULL;
3220  if (NULL != vs->sc)
3221  {
3223  "store cancel\n");
3225  vs->sc = NULL;
3226  }
3227  GNUNET_free (vs->address);
3228  GNUNET_free (vs);
3229 }
3230 
3231 
3238 static struct Neighbour *
3240 {
3242 }
3243 
3244 
3251 static struct VirtualLink *
3253 {
3255 }
3256 
3257 
3262 {
3269 
3273  struct GNUNET_TIME_Relative rtt;
3274 
3279 
3284 
3289 };
3290 
3291 
3300 static void
3302 {
3303  struct Neighbour *n = dvh->next_hop;
3304  struct DistanceVector *dv = dvh->dv;
3305  struct PendingAcknowledgement *pa;
3306 
3307  while (NULL != (pa = dvh->pa_head))
3308  {
3310  pa->dvh = NULL;
3311  }
3312  GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh);
3314  GNUNET_free (dvh);
3315 }
3316 
3317 
3324 static void
3325 check_link_down (void *cls);
3326 
3327 
3333 static void
3335 {
3337  "Informing CORE clients about disconnect from %s\n",
3338  GNUNET_i2s (pid));
3339  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3340  {
3341  struct GNUNET_MQ_Envelope *env;
3342  struct DisconnectInfoMessage *dim;
3343 
3344  if (CT_CORE != tc->type)
3345  continue;
3347  dim->peer = *pid;
3348  GNUNET_MQ_send (tc->mq, env);
3349  }
3350 }
3351 
3352 
3359 static void
3361 {
3362  struct DistanceVectorHop *dvh;
3363 
3364  while (NULL != (dvh = dv->dv_head))
3366  if (NULL == dv->dv_head)
3367  {
3368  struct VirtualLink *vl;
3369 
3370  GNUNET_assert (
3371  GNUNET_YES ==
3373  if (NULL != (vl = dv->vl))
3374  {
3375  GNUNET_assert (dv == vl->dv);
3376  vl->dv = NULL;
3377  if (NULL == vl->n)
3378  {
3380  free_virtual_link (vl);
3381  }
3382  else
3383  {
3386  }
3387  dv->vl = NULL;
3388  }
3389 
3390  if (NULL != dv->timeout_task)
3391  {
3393  dv->timeout_task = NULL;
3394  }
3395  GNUNET_free (dv);
3396  }
3397 }
3398 
3399 
3413 static void
3415  const struct GNUNET_PeerIdentity *peer,
3416  const char *address,
3417  enum GNUNET_NetworkType nt,
3418  const struct MonitorEvent *me)
3419 {
3420  struct GNUNET_MQ_Envelope *env;
3421  struct GNUNET_TRANSPORT_MonitorData *md;
3422  size_t addr_len = strlen (address) + 1;
3423 
3424  env = GNUNET_MQ_msg_extra (md,
3425  addr_len,
3427  md->nt = htonl ((uint32_t) nt);
3428  md->peer = *peer;
3429  md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
3430  md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
3431  md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
3432  md->rtt = GNUNET_TIME_relative_hton (me->rtt);
3433  md->cs = htonl ((uint32_t) me->cs);
3434  md->num_msg_pending = htonl (me->num_msg_pending);
3435  md->num_bytes_pending = htonl (me->num_bytes_pending);
3436  memcpy (&md[1], address, addr_len);
3437  GNUNET_MQ_send (tc->mq, env);
3438 }
3439 
3440 
3450 static void
3452  const char *address,
3453  enum GNUNET_NetworkType nt,
3454  const struct MonitorEvent *me)
3455 {
3456  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3457  {
3458  if (CT_MONITOR != tc->type)
3459  continue;
3460  if (tc->details.monitor.one_shot)
3461  continue;
3462  if ((GNUNET_NO == GNUNET_is_zero (&tc->details.monitor.peer)) &&
3463  (0 != GNUNET_memcmp (&tc->details.monitor.peer, peer)))
3464  continue;
3466  }
3467 }
3468 
3469 
3479 static void *
3481  struct GNUNET_SERVICE_Client *client,
3482  struct GNUNET_MQ_Handle *mq)
3483 {
3484  struct TransportClient *tc;
3485 
3486  (void) cls;
3487  tc = GNUNET_new (struct TransportClient);
3488  tc->client = client;
3489  tc->mq = mq;
3492  "Client %p of type %u connected\n",
3493  tc,
3494  tc->type);
3495  return tc;
3496 }
3497 
3498 
3504 static void
3505 free_neighbour (struct Neighbour *neighbour)
3506 {
3507  struct DistanceVectorHop *dvh;
3508  struct VirtualLink *vl;
3509 
3510  GNUNET_assert (NULL == neighbour->queue_head);
3513  &neighbour->pid,
3514  neighbour));
3516  "Freeing neighbour\n");
3517  while (NULL != (dvh = neighbour->dv_head))
3518  {
3519  struct DistanceVector *dv = dvh->dv;
3520 
3522  if (NULL == dv->dv_head)
3523  free_dv_route (dv);
3524  }
3525  if (NULL != neighbour->get)
3526  {
3527  GNUNET_PEERSTORE_iterate_cancel (neighbour->get);
3528  neighbour->get = NULL;
3529  }
3530  if (NULL != neighbour->sc)
3531  {
3533  "store cancel\n");
3534  GNUNET_PEERSTORE_store_cancel (neighbour->sc);
3535  neighbour->sc = NULL;
3536  }
3537  if (NULL != (vl = neighbour->vl))
3538  {
3539  GNUNET_assert (neighbour == vl->n);
3540  vl->n = NULL;
3541  if (NULL == vl->dv)
3542  {
3545  }
3546  else
3547  {
3550  }
3551  neighbour->vl = NULL;
3552  }
3553  GNUNET_free (neighbour);
3554 }
3555 
3556 
3563 static void
3565  const struct GNUNET_PeerIdentity *pid)
3566 {
3567  struct GNUNET_MQ_Envelope *env;
3568  struct ConnectInfoMessage *cim;
3569 
3570  GNUNET_assert (CT_CORE == tc->type);
3572  cim->id = *pid;
3573  GNUNET_MQ_send (tc->mq, env);
3574 }
3575 
3576 
3582 static void
3584 {
3586  "Informing CORE clients about connection to %s\n",
3587  GNUNET_i2s (pid));
3588  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3589  {
3590  if (CT_CORE != tc->type)
3591  continue;
3593  }
3594 }
3595 
3596 
3604 static void
3605 transmit_on_queue (void *cls);
3606 
3607 
3611 static unsigned int
3613 {
3614  for (struct Queue *s = queue_head; NULL != s;
3615  s = s->next_client)
3616  {
3617  if (s->tc->details.communicator.address_prefix !=
3618  queue->tc->details.communicator.address_prefix)
3619  {
3621  "queue address %s qid %u compare with queue: address %s qid %u\n",
3622  queue->address,
3623  queue->qid,
3624  s->address,
3625  s->qid);
3626  if ((s->priority > queue->priority) && (0 < s->q_capacity) &&
3627  (QUEUE_LENGTH_LIMIT > s->queue_length) )
3628  return GNUNET_YES;
3630  "Lower prio\n");
3631  }
3632  }
3633  return GNUNET_NO;
3634 }
3635 
3636 
3644 static void
3646  struct Queue *queue,
3648 {
3650  queue->tc->details.communicator.
3651  queue_head))
3652  return;
3653 
3654  if (queue->tc->details.communicator.total_queue_length >=
3656  {
3658  "Transmission throttled due to communicator queue limit\n");
3660  GST_stats,
3661  "# Transmission throttled due to communicator queue limit",
3662  1,
3663  GNUNET_NO);
3664  queue->idle = GNUNET_NO;
3665  return;
3666  }
3667  if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
3668  {
3670  "Transmission throttled due to communicator queue length limit\n");
3672  "# Transmission throttled due to queue queue limit",
3673  1,
3674  GNUNET_NO);
3675  queue->idle = GNUNET_NO;
3676  return;
3677  }
3678  if (0 == queue->q_capacity)
3679  {
3681  "Transmission throttled due to communicator message queue qid %u has capacity %"
3682  PRIu64 ".\n",
3683  queue->qid,
3684  queue->q_capacity);
3686  "# Transmission throttled due to message queue capacity",
3687  1,
3688  GNUNET_NO);
3689  queue->idle = GNUNET_NO;
3690  return;
3691  }
3692  /* queue might indeed be ready, schedule it */
3693  if (NULL != queue->transmit_task)
3694  GNUNET_SCHEDULER_cancel (queue->transmit_task);
3695  queue->transmit_task =
3697  queue);
3699  "Considering transmission on queue `%s' QID %llu to %s\n",
3700  queue->address,
3701  (unsigned long long) queue->qid,
3702  GNUNET_i2s (&queue->neighbour->pid));
3703 }
3704 
3705 
3712 static void
3713 check_link_down (void *cls)
3714 {
3715  struct VirtualLink *vl = cls;
3716  struct DistanceVector *dv = vl->dv;
3717  struct Neighbour *n = vl->n;
3718  struct GNUNET_TIME_Absolute dvh_timeout;
3719  struct GNUNET_TIME_Absolute q_timeout;
3720 
3722  "Checking if link is down\n");
3723  vl->visibility_task = NULL;
3724  dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3725  if (NULL != dv)
3726  {
3727  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
3728  pos = pos->next_dv)
3729  dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout,
3730  pos->path_valid_until);
3731  if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
3732  {
3733  vl->dv->vl = NULL;
3734  vl->dv = NULL;
3735  }
3736  }
3737  q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3738  for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
3739  q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
3740  if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
3741  {
3742  vl->n->vl = NULL;
3743  vl->n = NULL;
3744  }
3745  if ((NULL == vl->n) && (NULL == vl->dv))
3746  {
3748  free_virtual_link (vl);
3749  return;
3750  }
3751  vl->visibility_task =
3752  GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout),
3753  &check_link_down,
3754  vl);
3755 }
3756 
3757 
3763 static void
3765 {
3766  struct Neighbour *neighbour = queue->neighbour;
3767  struct TransportClient *tc = queue->tc;
3768  struct MonitorEvent me = { .cs = GNUNET_TRANSPORT_CS_DOWN,
3770  struct QueueEntry *qe;
3771  int maxxed;
3772  struct PendingAcknowledgement *pa;
3773  struct VirtualLink *vl;
3774 
3776  "Cleaning up queue %u\n", queue->qid);
3777  if (NULL != queue->transmit_task)
3778  {
3779  GNUNET_SCHEDULER_cancel (queue->transmit_task);
3780  queue->transmit_task = NULL;
3781  }
3782  while (NULL != (pa = queue->pa_head))
3783  {
3784  GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
3785  pa->queue = NULL;
3786  }
3787 
3788  GNUNET_CONTAINER_MDLL_remove (neighbour,
3789  neighbour->queue_head,
3790  neighbour->queue_tail,
3791  queue);
3793  tc->details.communicator.queue_head,
3794  tc->details.communicator.queue_tail,
3795  queue);
3796  maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT <=
3797  tc->details.communicator.
3798  total_queue_length);
3800  "Cleaning up queue with length %u\n",
3801  queue->queue_length);
3802  while (NULL != (qe = queue->queue_head))
3803  {
3804  GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe);
3805  queue->queue_length--;
3806  tc->details.communicator.total_queue_length--;
3807  if (NULL != qe->pm)
3808  {
3809  GNUNET_assert (qe == qe->pm->qe);
3810  qe->pm->qe = NULL;
3811  }
3812  GNUNET_free (qe);
3813  }
3815  "Cleaning up queue with length %u\n",
3816  queue->queue_length);
3817  GNUNET_assert (0 == queue->queue_length);
3818  if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT >
3819  tc->details.communicator.total_queue_length))
3820  {
3821  /* Communicator dropped below threshold, resume all _other_ queues */
3823  GST_stats,
3824  "# Transmission throttled due to communicator queue limit",
3825  -1,
3826  GNUNET_NO);
3827  for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
3828  s = s->next_client)
3830  s,
3832  }
3833  notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
3834  GNUNET_free (queue);
3835 
3836  vl = lookup_virtual_link (&neighbour->pid);
3837  if ((NULL != vl) && (neighbour == vl->n))
3838  {
3840  check_link_down (vl);
3841  }
3842  if (NULL == neighbour->queue_head)
3843  {
3844  free_neighbour (neighbour);
3845  }
3846 }
3847 
3848 
3854 static void
3856 {
3857  struct TransportClient *tc = ale->tc;
3858 
3859  GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
3860  tc->details.communicator.addr_tail,
3861  ale);
3862  if (NULL != ale->sc)
3863  {
3865  "store cancel\n");
3867  ale->sc = NULL;
3868  }
3869  if (NULL != ale->st)
3870  {
3871  GNUNET_SCHEDULER_cancel (ale->st);
3872  ale->st = NULL;
3873  }
3874  GNUNET_free (ale);
3875 }
3876 
3877 
3886 static int
3888  const struct GNUNET_PeerIdentity *pid,
3889  void *value)
3890 {
3891  struct TransportClient *tc = cls;
3892  struct PeerRequest *pr = value;
3893 
3895  pr->wc = NULL;
3896  GNUNET_assert (
3897  GNUNET_YES ==
3898  GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
3899  pid,
3900  pr));
3901  GNUNET_free (pr);
3902 
3903  return GNUNET_OK;
3904 }
3905 
3906 
3907 static void
3908 do_shutdown (void *cls);
3909 
3918 static void
3920  struct GNUNET_SERVICE_Client *client,
3921  void *app_ctx)
3922 {
3923  struct TransportClient *tc = app_ctx;
3924 
3925  (void) cls;
3926  (void) client;
3928  switch (tc->type)
3929  {
3930  case CT_NONE:
3932  "Unknown Client %p disconnected, cleaning up.\n",
3933  tc);
3934  break;
3935 
3936  case CT_CORE: {
3938  "CORE Client %p disconnected, cleaning up.\n",
3939  tc);
3940 
3941  struct PendingMessage *pm;
3942 
3943  while (NULL != (pm = tc->details.core.pending_msg_head))
3944  {
3946  tc->details.core.pending_msg_head,
3947  tc->details.core.pending_msg_tail,
3948  pm);
3949  pm->client = NULL;
3950  }
3951  }
3952  break;
3953 
3954  case CT_MONITOR:
3956  "MONITOR Client %p disconnected, cleaning up.\n",
3957  tc);
3958 
3959  break;
3960 
3961  case CT_COMMUNICATOR: {
3963  "COMMUNICATOR Client %p disconnected, cleaning up.\n",
3964  tc);
3965 
3966  struct Queue *q;
3967  struct AddressListEntry *ale;
3968 
3969  while (NULL != (q = tc->details.communicator.queue_head))
3970  free_queue (q);
3971  while (NULL != (ale = tc->details.communicator.addr_head))
3973  GNUNET_free (tc->details.communicator.address_prefix);
3974  }
3975  break;
3976 
3977  case CT_APPLICATION:
3979  "APPLICATION Client %p disconnected, cleaning up.\n",
3980  tc);
3981 
3982  GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
3984  tc);
3985  GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests);
3986  break;
3987  }
3988  GNUNET_free (tc);
3989  if ((GNUNET_YES == in_shutdown) && (NULL == clients_head))
3990  {
3992  "Our last client disconnected\n");
3993  do_shutdown (cls);
3994  }
3995 }
3996 
3997 
4007 static int
4009  const struct GNUNET_PeerIdentity *pid,
4010  void *value)
4011 {
4012  struct TransportClient *tc = cls;
4013 
4014  (void) value;
4016  "Telling new CORE client about existing connection to %s\n",
4017  GNUNET_i2s (pid));
4019  return GNUNET_OK;
4020 }
4021 
4022 
4031 static void
4032 handle_client_start (void *cls, const struct StartMessage *start)
4033 {
4034  struct TransportClient *tc = cls;
4035  uint32_t options;
4036 
4037  options = ntohl (start->options);
4038  if ((0 != (1 & options)) &&
4039  (0 != GNUNET_memcmp (&start->self, &GST_my_identity)))
4040  {
4041  /* client thinks this is a different peer, reject */
4042  GNUNET_break (0);
4043  GNUNET_SERVICE_client_drop (tc->client);
4044  return;
4045  }
4046  if (CT_NONE != tc->type)
4047  {
4048  GNUNET_break (0);
4049  GNUNET_SERVICE_client_drop (tc->client);
4050  return;
4051  }
4052  tc->type = CT_CORE;
4054  "New CORE client with PID %s registered\n",
4055  GNUNET_i2s (&start->self));
4058  tc);
4060 }
4061 
4062 
4069 static int
4070 check_client_send (void *cls, const struct OutboundMessage *obm)
4071 {
4072  struct TransportClient *tc = cls;
4073  uint16_t size;
4074  const struct GNUNET_MessageHeader *obmm;
4075 
4076  if (CT_CORE != tc->type)
4077  {
4078  GNUNET_break (0);
4079  return GNUNET_SYSERR;
4080  }
4081  size = ntohs (obm->header.size) - sizeof(struct OutboundMessage);
4082  if (size < sizeof(struct GNUNET_MessageHeader))
4083  {
4084  GNUNET_break (0);
4085  return GNUNET_SYSERR;
4086  }
4087  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
4088  if (size != ntohs (obmm->size))
4089  {
4090  GNUNET_break (0);
4091  return GNUNET_SYSERR;
4092  }
4093  return GNUNET_OK;
4094 }
4095 
4096 
4104 static void
4106 {
4107  struct TransportClient *tc = pm->client;
4108  struct VirtualLink *vl = pm->vl;
4109 
4111  "client send response\n");
4112  if (NULL != tc)
4113  {
4114  struct GNUNET_MQ_Envelope *env;
4115  struct SendOkMessage *so_msg;
4116 
4118  so_msg->peer = vl->target;
4120  "Confirming transmission of <%llu> to %s\n",
4121  pm->logging_uuid,
4122  GNUNET_i2s (&vl->target));
4123  GNUNET_MQ_send (tc->mq, env);
4124  }
4126 }
4127 
4128 
4138 static unsigned int
4141  struct DistanceVectorHop **hops_array,
4142  unsigned int hops_array_length)
4143 {
4144  uint64_t choices[hops_array_length];
4145  uint64_t num_dv;
4146  unsigned int dv_count;
4147 
4148  /* Pick random vectors, but weighted by distance, giving more weight
4149  to shorter vectors */
4150  num_dv = 0;
4151  dv_count = 0;
4152  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4153  pos = pos->next_dv)
4154  {
4155  if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
4156  (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
4157  .rel_value_us == 0))
4158  continue; /* pos unconfirmed and confirmed required */
4159  num_dv += MAX_DV_HOPS_ALLOWED - pos->distance;
4160  dv_count++;
4161  }
4162  if (0 == dv_count)
4163  return 0;
4164  if (dv_count <= hops_array_length)
4165  {
4166  dv_count = 0;
4167  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4168  pos = pos->next_dv)
4169  hops_array[dv_count++] = pos;
4170  return dv_count;
4171  }
4172  for (unsigned int i = 0; i < hops_array_length; i++)
4173  {
4174  int ok = GNUNET_NO;
4175  while (GNUNET_NO == ok)
4176  {
4177  choices[i] =
4179  ok = GNUNET_YES;
4180  for (unsigned int j = 0; j < i; j++)
4181  if (choices[i] == choices[j])
4182  {
4183  ok = GNUNET_NO;
4184  break;
4185  }
4186  }
4187  }
4188  dv_count = 0;
4189  num_dv = 0;
4190  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4191  pos = pos->next_dv)
4192  {
4193  uint32_t delta = MAX_DV_HOPS_ALLOWED - pos->distance;
4194 
4195  if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
4196  (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
4197  .rel_value_us == 0))
4198  continue; /* pos unconfirmed and confirmed required */
4199  for (unsigned int i = 0; i < hops_array_length; i++)
4200  if ((num_dv <= choices[i]) && (num_dv + delta > choices[i]))
4201  hops_array[dv_count++] = pos;
4202  num_dv += delta;
4203  }
4204  return dv_count;
4205 }
4206 
4207 
4214 static int
4216  void *cls,
4217  const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
4218 {
4219  struct TransportClient *tc = cls;
4220  uint16_t size;
4221 
4222  if (CT_NONE != tc->type)
4223  {
4224  GNUNET_break (0);
4225  return GNUNET_SYSERR;
4226  }
4227  tc->type = CT_COMMUNICATOR;
4228  size = ntohs (cam->header.size) - sizeof(*cam);
4229  if (0 == size)
4230  return GNUNET_OK; /* receive-only communicator */
4232  return GNUNET_OK;
4233 }
4234 
4235 
4241 static void
4243  unsigned
4244  int continue_client)
4245 {
4246  if (0 != ntohl (cmc->im.fc_on))
4247  {
4248  /* send ACK when done to communicator for flow control! */
4249  struct GNUNET_MQ_Envelope *env;
4250  struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
4251 
4253  "Acknowledge message with flow control id %" PRIu64 "\n",
4254  cmc->im.fc_id);
4256  ack->reserved = htonl (0);
4257  ack->fc_id = cmc->im.fc_id;
4258  ack->sender = cmc->im.neighbour_sender;
4259  GNUNET_MQ_send (cmc->tc->mq, env);
4260  }
4261 
4262  if (GNUNET_YES == continue_client)
4263  {
4265  }
4266  GNUNET_free (cmc);
4267 }
4268 
4269 
4270 static void
4272 {
4274 }
4275 
4276 
4286 static void
4287 handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
4288 {
4289  struct TransportClient *tc = cls;
4290  struct VirtualLink *vl;
4291  uint32_t delta;
4292  struct CommunicatorMessageContext *cmc;
4293 
4294  if (CT_CORE != tc->type)
4295  {
4296  GNUNET_break (0);
4297  GNUNET_SERVICE_client_drop (tc->client);
4298  return;
4299  }
4300  vl = lookup_virtual_link (&rom->peer);
4301  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
4302  {
4304  "# RECV_OK dropped: virtual link unknown",
4305  1,
4306  GNUNET_NO);
4308  return;
4309  }
4310  delta = ntohl (rom->increase_window_delta);
4311  vl->core_recv_window += delta;
4313  "CORE ack receiving message, increased CORE recv window to %u\n",
4314  vl->core_recv_window);
4316  if (vl->core_recv_window <= 0)
4317  return;
4318  /* resume communicators */
4319  while (NULL != (cmc = vl->cmc_tail))
4320  {
4322  finish_cmc_handling (cmc);
4323  }
4324 }
4325 
4326 
4333 static void
4335  void *cls,
4336  const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
4337 {
4338  struct TransportClient *tc = cls;
4339  uint16_t size;
4340 
4341  size = ntohs (cam->header.size) - sizeof(*cam);
4342  if (0 == size)
4343  {
4345  "Receive-only communicator connected\n");
4346  return; /* receive-only communicator */
4347  }
4348  tc->details.communicator.address_prefix =
4349  GNUNET_strdup ((const char *) &cam[1]);
4350  tc->details.communicator.cc =
4351  (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
4353  "Communicator with prefix `%s' connected\n",
4354  tc->details.communicator.address_prefix);
4356 }
4357 
4358 
4366 static int
4368  void *cls,
4369  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
4370 {
4371  const struct GNUNET_MessageHeader *inbox;
4372  const char *is;
4373  uint16_t msize;
4374  uint16_t isize;
4375 
4376  (void) cls;
4377  msize = ntohs (cb->header.size) - sizeof(*cb);
4378  inbox = (const struct GNUNET_MessageHeader *) &cb[1];
4379  isize = ntohs (inbox->size);
4380  if (isize >= msize)
4381  {
4382  GNUNET_break (0);
4383  return GNUNET_SYSERR;
4384  }
4385  is = (const char *) inbox;
4386  is += isize;
4387  msize -= isize;
4388  GNUNET_assert (0 < msize);
4389  if ('\0' != is[msize - 1])
4390  {
4391  GNUNET_break (0);
4392  return GNUNET_SYSERR;
4393  }
4394  return GNUNET_OK;
4395 }
4396 
4397 
4404 static void
4406 {
4407  struct EphemeralConfirmationPS ec;
4408 
4409  if (0 !=
4411  return;
4413  dv->ephemeral_validity =
4418  ec.target = dv->target;
4419  ec.ephemeral_key = dv->ephemeral_key;
4421  ec.purpose.size = htonl (sizeof(ec));
4423  &ec,
4424  &dv->sender_sig);
4425 }
4426 
4427 
4437 static void
4439  struct PendingMessage *pm,
4440  const void *payload,
4441  size_t payload_size)
4442 {
4443  struct Neighbour *n = queue->neighbour;
4444  struct GNUNET_TRANSPORT_SendMessageTo *smt;
4445  struct GNUNET_MQ_Envelope *env;
4446  struct PendingAcknowledgement *pa;
4447 
4448  GNUNET_log (
4450  "Queueing %u bytes of payload for transmission <%llu> on queue %llu to %s\n",
4451  (unsigned int) payload_size,
4452  (NULL == pm) ? 0 : pm->logging_uuid,
4453  (unsigned long long) queue->qid,
4454  GNUNET_i2s (&queue->neighbour->pid));
4455  env = GNUNET_MQ_msg_extra (smt,
4456  payload_size,
4458  smt->qid = htonl (queue->qid);
4459  smt->mid = GNUNET_htonll (queue->mid_gen);
4460  smt->receiver = n->pid;
4461  memcpy (&smt[1], payload, payload_size);
4462  {
4463  /* Pass the env to the communicator of queue for transmission. */
4464  struct QueueEntry *qe;
4465 
4466  qe = GNUNET_new (struct QueueEntry);
4467  qe->mid = queue->mid_gen;
4469  "Create QueueEntry with MID %" PRIu64
4470  " and QID %u and prefix %s\n",
4471  qe->mid,
4472  queue->qid,
4473  queue->tc->details.communicator.address_prefix);
4474  queue->mid_gen++;
4475  qe->queue = queue;
4476  if (NULL != pm)
4477  {
4478  qe->pm = pm;
4479  // TODO Why do we have a retransmission. When we know, make decision if we still want this.
4480  // GNUNET_assert (NULL == pm->qe);
4481  if (NULL != pm->qe)
4482  {
4484  "Retransmitting message <%llu> remove pm from qe with MID: %llu \n",
4485  pm->logging_uuid,
4486  (unsigned long long) pm->qe->mid);
4487  pm->qe->pm = NULL;
4488  }
4489  pm->qe = qe;
4490  }
4491  GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe);
4492  GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
4493  if (0 == queue->q_capacity)
4494  {
4495  // Messages without FC or fragments can get here.
4496  if (NULL != pm)
4498  "Message %llu (pm type %u) was not send because queue has no capacity.\n",
4499  pm->logging_uuid,
4500  pm->pmt);
4501  GNUNET_free (env);
4502  return;
4503  }
4504  queue->queue_length++;
4505  queue->tc->details.communicator.total_queue_length++;
4506  if (GNUNET_NO == queue->unlimited_length)
4507  queue->q_capacity--;
4509  "Queue %s with qid %u has capacity %" PRIu64 "\n",
4510  queue->address,
4511  queue->qid,
4512  queue->q_capacity);
4514  queue->tc->details.communicator.total_queue_length)
4515  queue->idle = GNUNET_NO;
4516  if (QUEUE_LENGTH_LIMIT == queue->queue_length)
4517  queue->idle = GNUNET_NO;
4518  if (0 == queue->q_capacity)
4519  queue->idle = GNUNET_NO;
4520 
4521  if (NULL != pm && NULL != (pa = pm->pa_head))
4522  {
4523  while (pm != pa->pm)
4524  pa = pa->next_pa;
4525  pa->num_send++;
4526  }
4527  // GNUNET_CONTAINER_multiuuidmap_get (pending_acks, &ack[i].ack_uuid.value);
4529  "Sending message MID %" PRIu64
4530  " of type %u (%u) and size %lu with MQ %p QID %u\n",
4531  GNUNET_ntohll (smt->mid),
4532  ntohs (((const struct GNUNET_MessageHeader *) payload)->type),
4533  ntohs (smt->header.size),
4534  (unsigned long) payload_size,
4535  queue->tc->mq,
4536  queue->qid);
4537  GNUNET_MQ_send (queue->tc->mq, env);
4538  }
4539 }
4540 
4541 
4552 static struct GNUNET_TIME_Relative
4553 route_via_neighbour (const struct Neighbour *n,
4554  const struct GNUNET_MessageHeader *hdr,
4556 {
4557  struct GNUNET_TIME_Absolute now;
4558  unsigned int candidates;
4559  unsigned int sel1;
4560  unsigned int sel2;
4561  struct GNUNET_TIME_Relative rtt;
4562 
4563  /* Pick one or two 'random' queues from n (under constraints of options) */
4564  now = GNUNET_TIME_absolute_get ();
4565  /* FIXME-OPTIMIZE: give queues 'weights' and pick proportional to
4566  weight in the future; weight could be assigned by observed
4567  bandwidth (note: not sure if we should do this for this type
4568  of control traffic though). */
4569  candidates = 0;
4570  for (struct Queue *pos = n->queue_head; NULL != pos;
4571  pos = pos->next_neighbour)
4572  {
4573  if ((0 != (options & RMO_UNCONFIRMED_ALLOWED)) ||
4574  (pos->validated_until.abs_value_us > now.abs_value_us))
4575  candidates++;
4576  }
4577  if (0 == candidates)
4578  {
4579  /* This can happen rarely if the last confirmed queue timed
4580  out just as we were beginning to process this message. */
4582  "Could not route message of type %u to %s: no valid queue\n",
4583  ntohs (hdr->type),
4584  GNUNET_i2s (&n->pid));
4586  "# route selection failed (all no valid queue)",
4587  1,
4588  GNUNET_NO);
4590  }
4591 
4594  if (0 == (options & RMO_REDUNDANT))
4595  sel2 = candidates; /* picks none! */
4596  else
4598  candidates = 0;
4599  for (struct Queue *pos = n->queue_head; NULL != pos;
4600  pos = pos->next_neighbour)
4601  {
4602  if ((0 != (options & RMO_UNCONFIRMED_ALLOWED)) ||
4603  (pos->validated_until.abs_value_us > now.abs_value_us))
4604  {
4605  if ((sel1 == candidates) || (sel2 == candidates))
4606  {
4608  "Routing message of type %u to %s using %s (#%u)\n",
4609  ntohs (hdr->type),
4610  GNUNET_i2s (&n->pid),
4611  pos->address,
4612  (sel1 == candidates) ? 1 : 2);
4613  rtt = GNUNET_TIME_relative_min (rtt, pos->pd.aged_rtt);
4614  queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
4615  }
4616  candidates++;
4617  }
4618  }
4619  return rtt;
4620 }
4621 
4622 
4627 {
4631  gcry_cipher_hd_t cipher;
4632 
4636  struct
4637  {
4642 
4646  char aes_key[256 / 8];
4647 
4651  char aes_ctr[128 / 8];
4653 };
4654 
4655 
4664 static void
4666  const struct GNUNET_ShortHashCode *iv,
4667  struct DVKeyState *key)
4668 {
4669  /* must match #dh_key_derive_eph_pub */
4671  GNUNET_CRYPTO_kdf (&key->material,
4672  sizeof(key->material),
4673  "transport-backchannel-key",
4674  strlen ("transport-backchannel-key"),
4675  km,
4676  sizeof(*km),
4677  iv,
4678  sizeof(*iv),
4679  NULL));
4681  "Deriving backchannel key based on KM %s and IV %s\n",
4682  GNUNET_h2s (km),
4683  GNUNET_sh2s (iv));
4684  GNUNET_assert (0 == gcry_cipher_open (&key->cipher,
4685  GCRY_CIPHER_AES256 /* low level: go for speed */,
4686  GCRY_CIPHER_MODE_CTR,
4687  0 /* flags */));
4688  GNUNET_assert (0 == gcry_cipher_setkey (key->cipher,
4689  &key->material.aes_key,
4690  sizeof(key->material.aes_key)));
4691  gcry_cipher_setctr (key->cipher,
4692  &key->material.aes_ctr,
4693  sizeof(key->material.aes_ctr));
4694 }
4695 
4696 
4707 static enum GNUNET_GenericReturnValue
4709  const struct GNUNET_CRYPTO_EcdhePrivateKey *priv_ephemeral,
4710  const struct GNUNET_PeerIdentity *target,
4711  const struct GNUNET_ShortHashCode *iv,
4712  struct DVKeyState *key)
4713 {
4714  struct GNUNET_HashCode km;
4715 
4716  if (GNUNET_YES != GNUNET_CRYPTO_ecdh_eddsa (priv_ephemeral,
4717  &target->public_key,
4718  &km))
4719  return GNUNET_SYSERR;
4720  // FIXME: Possibly also add return values here. We are processing
4721  // Input from other peers...
4722  dv_setup_key_state_from_km (&km, iv, key);
4723  return GNUNET_OK;
4724 }
4725 
4726 
4737 static enum GNUNET_GenericReturnValue
4738 dh_key_derive_eph_pub (const struct GNUNET_CRYPTO_EcdhePublicKey *pub_ephemeral,
4739  const struct GNUNET_ShortHashCode *iv,
4740  struct DVKeyState *key)
4741 {
4742  struct GNUNET_HashCode km;
4743 
4745  pub_ephemeral,
4746  &km))
4747  return GNUNET_SYSERR;
4748  dv_setup_key_state_from_km (&km, iv, key);
4749  return GNUNET_OK;
4750 }
4751 
4752 
4762 static void
4763 dv_hmac (const struct DVKeyState *key,
4764  struct GNUNET_HashCode *hmac,
4765  const void *data,
4766  size_t data_size)
4767 {
4768  GNUNET_CRYPTO_hmac (&key->material.hmac_key, data, data_size, hmac);
4769 }
4770 
4771 
4781 static void
4782 dv_encrypt (struct DVKeyState *key, const void *in, void *dst, size_t in_size)
4783 {
4784  GNUNET_assert (0 ==
4785  gcry_cipher_encrypt (key->cipher, dst, in_size, in, in_size));
4786 }
4787 
4788 
4799 static enum GNUNET_GenericReturnValue
4800 dv_decrypt (struct DVKeyState *key,
4801  void *out,
4802  const void *ciph,
4803  size_t out_size)
4804 {
4805  return (0 ==
4806  gcry_cipher_decrypt (key->cipher,
4807  out, out_size,
4808  ciph, out_size)) ? GNUNET_OK : GNUNET_SYSERR;
4809 }
4810 
4811 
4817 static void
4819 {
4820  gcry_cipher_close (key->cipher);
4821  GNUNET_CRYPTO_zero_keys (&key->material, sizeof(key->material));
4822 }
4823 
4824 
4835 typedef void (*DVMessageHandler) (void *cls,
4836  struct Neighbour *next_hop,
4837  const struct GNUNET_MessageHeader *hdr,
4839 
4854 static struct GNUNET_TIME_Relative
4856  unsigned int num_dvhs,
4857  struct DistanceVectorHop **dvhs,
4858  const struct GNUNET_MessageHeader *hdr,
4859  DVMessageHandler use,
4860  void *use_cls,
4862  enum GNUNET_GenericReturnValue without_fc)
4863 {
4864  struct TransportDVBoxMessage box_hdr;
4865  struct TransportDVBoxPayloadP payload_hdr;
4866  uint16_t enc_body_size = ntohs (hdr->size);
4867  char enc[sizeof(struct TransportDVBoxPayloadP) + enc_body_size] GNUNET_ALIGN;
4868  struct DVKeyState *key;
4869  struct GNUNET_TIME_Relative rtt;
4870 
4871  key = GNUNET_new (struct DVKeyState);
4872  /* Encrypt payload */
4874  box_hdr.total_hops = htons (0);
4875  box_hdr.without_fc = htons (without_fc);
4876  update_ephemeral (dv);
4877  box_hdr.ephemeral_key = dv->ephemeral_key;
4878  payload_hdr.sender_sig = dv->sender_sig;
4879 
4881  &box_hdr.iv,
4882  sizeof(box_hdr.iv));
4883  // We are creating this key, so this must work.
4885  dh_key_derive_eph_pid (&dv->private_key,
4886  &dv->target,
4887  &box_hdr.iv, key));
4888  payload_hdr.sender = GST_my_identity;
4889  payload_hdr.monotonic_time = GNUNET_TIME_absolute_hton (dv->monotime);
4890  dv_encrypt (key, &payload_hdr, enc, sizeof(payload_hdr));
4891  dv_encrypt (key,
4892  hdr,
4893  &enc[sizeof(struct TransportDVBoxPayloadP)],
4894  enc_body_size);
4895  dv_hmac (key, &box_hdr.hmac, enc, sizeof(enc));
4896  dv_key_clean (key);
4898  /* For each selected path, take the pre-computed header and body
4899  and add the path in the middle of the message; then send it. */
4900  for (unsigned int i = 0; i < num_dvhs; i++)
4901  {
4902  struct DistanceVectorHop *dvh = dvhs[i];
4903  unsigned int num_hops = dvh->distance + 1;
4904  char buf[sizeof(struct TransportDVBoxMessage)
4905  + sizeof(struct GNUNET_PeerIdentity) * num_hops
4906  + sizeof(struct TransportDVBoxPayloadP)
4907  + enc_body_size] GNUNET_ALIGN;
4908  struct GNUNET_PeerIdentity *dhops;
4909 
4910  box_hdr.header.size = htons (sizeof(buf));
4911  box_hdr.orig_size = htons (sizeof(buf));
4912  box_hdr.num_hops = htons (num_hops);
4913  memcpy (buf, &box_hdr, sizeof(box_hdr));
4914  dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof(box_hdr)];
4915  memcpy (dhops,
4916  dvh->path,
4917  dvh->distance * sizeof(struct GNUNET_PeerIdentity));
4918  dhops[dvh->distance] = dv->target;
4919  if (GNUNET_EXTRA_LOGGING > 0)
4920  {
4921  char *path;
4922 
4924  for (unsigned int j = 0; j < num_hops; j++)
4925  {
4926  char *tmp;
4927 
4928  GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[j]));
4929  GNUNET_free (path);
4930  path = tmp;
4931  }
4933  "Routing message of type %u to %s using DV (#%u/%u) via %s\n",
4934  ntohs (hdr->type),
4935  GNUNET_i2s (&dv->target),
4936  i + 1,
4937  num_dvhs,
4938  path);
4939  GNUNET_free (path);
4940  }
4941  rtt = GNUNET_TIME_relative_min (rtt, dvh->pd.aged_rtt);
4942  memcpy (&dhops[num_hops], enc, sizeof(enc));
4943  use (use_cls,
4944  dvh->next_hop,
4945  (const struct GNUNET_MessageHeader *) buf,
4946  options);
4947  GNUNET_free (key);
4948  }
4949  return rtt;
4950 }
4951 
4952 
4962 static void
4964  struct Neighbour *next_hop,
4965  const struct GNUNET_MessageHeader *hdr,
4967 {
4968  (void) cls;
4969  (void) route_via_neighbour (next_hop, hdr, RMO_UNCONFIRMED_ALLOWED);
4970 }
4971 
4972 
4984 static struct GNUNET_TIME_Relative
4986 // route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
4987  const struct GNUNET_MessageHeader *hdr,
4989 {
4990  // struct VirtualLink *vl;
4991  struct Neighbour *n;
4992  struct DistanceVector *dv;
4993  struct GNUNET_TIME_Relative rtt1;
4994  struct GNUNET_TIME_Relative rtt2;
4995  const struct GNUNET_PeerIdentity *target = &vl->target;
4996 
4998  "Trying to route message of type %u to %s without fc\n",
4999  ntohs (hdr->type),
5000  GNUNET_i2s (target));
5001 
5002  // TODO Do this elsewhere. vl should be given as parameter to method.
5003  // vl = lookup_virtual_link (target);
5004  GNUNET_assert (NULL != vl && GNUNET_YES == vl->confirmed);
5005  if (NULL == vl)
5007  n = vl->n;
5008  dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
5009  if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
5010  {
5011  /* if confirmed is required, and we do not have anything
5012  confirmed, drop respective options */
5013  if (NULL == n)
5014  n = lookup_neighbour (target);
5015  if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
5017  }
5018  if ((NULL == n) && (NULL == dv))
5019  {
5021  "Cannot route message of type %u to %s: no route\n",
5022  ntohs (hdr->type),
5023  GNUNET_i2s (target));
5025  "# Messages dropped in routing: no acceptable method",
5026  1,
5027  GNUNET_NO);
5029  }
5031  "Routing message of type %u to %s with options %X\n",
5032  ntohs (hdr->type),
5033  GNUNET_i2s (target),
5034  (unsigned int) options);
5035  /* If both dv and n are possible and we must choose:
5036  flip a coin for the choice between the two; for now 50/50 */
5037  if ((NULL != n) && (NULL != dv) && (0 == (options & RMO_REDUNDANT)))
5038  {
5040  n = NULL;
5041  else
5042  dv = NULL;
5043  }
5044  if ((NULL != n) && (NULL != dv))
5045  options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's
5046  enough for redundancy, so clear the flag. */
5049  if (NULL != n)
5050  {
5052  "Try to route message of type %u to %s without fc via neighbour\n",
5053  ntohs (hdr->type),
5054  GNUNET_i2s (target));
5055  rtt1 = route_via_neighbour (n, hdr, options);
5056  }
5057  if (NULL != dv)
5058  {
5059  struct DistanceVectorHop *hops[2];
5060  unsigned int res;
5061 
5063  options,
5064  hops,
5065  (0 == (options & RMO_REDUNDANT)) ? 1 : 2);
5066  if (0 == res)
5067  {
5069  "Failed to route message, could not determine DV path\n");
5070  return rtt1;
5071  }
5073  "encapsulate_for_dv 1\n");
5074  rtt2 = encapsulate_for_dv (dv,
5075  res,
5076  hops,
5077  hdr,
5079  NULL,
5080  options & (~RMO_REDUNDANT),
5081  GNUNET_YES);
5082  }
5083  return GNUNET_TIME_relative_min (rtt1, rtt2);
5084 }
5085 
5086 
5087 static void
5088 consider_sending_fc (void *cls);
5089 
5096 static void
5098 {
5099  struct VirtualLink *vl = cls;
5100  vl->fc_retransmit_task = NULL;
5101  consider_sending_fc (cls);
5102 }
5103 
5104 
5111 static void
5113 {
5114  struct VirtualLink *vl = cls;
5115  struct GNUNET_TIME_Absolute monotime;
5116  struct TransportFlowControlMessage fc;
5118  struct GNUNET_TIME_Relative rtt;
5119 
5121  /* OPTIMIZE-FC-BDP: decide sane criteria on when to do this, instead of doing
5122  it always! */
5123  /* For example, we should probably ONLY do this if a bit more than
5124  an RTT has passed, or if the window changed "significantly" since
5125  then. See vl->last_fc_rtt! NOTE: to do this properly, we also
5126  need an estimate for the bandwidth-delay-product for the entire
5127  VL, as that determines "significantly". We have the delay, but
5128  the bandwidth statistics need to be added for the VL!*/(void) duration;
5129 
5131  "Sending FC seq %u to %s with new window %llu\n",
5132  (unsigned int) vl->fc_seq_gen,
5133  GNUNET_i2s (&vl->target),
5134  (unsigned long long) vl->incoming_fc_window_size);
5136  vl->last_fc_transmission = monotime;
5138  fc.header.size = htons (sizeof(fc));
5139  fc.seq = htonl (vl->fc_seq_gen++);
5145  fc.sender_time = GNUNET_TIME_absolute_hton (monotime);
5147  if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us)
5148  {
5151  "FC retransmission to %s failed, will retry in %s\n",
5152  GNUNET_i2s (&vl->target),
5155  }
5156  else
5157  {
5158  /* OPTIMIZE-FC-BDP: rtt is not ideal, we can do better! */
5159  vl->last_fc_rtt = rtt;
5160  }
5161  if (NULL != vl->fc_retransmit_task)
5164  {
5166  vl->fc_retransmit_count = 0;
5167  }
5168  vl->fc_retransmit_task =
5170  vl->fc_retransmit_count++;
5171 }
5172 
5173 
5190 static void
5192 {
5193  struct Neighbour *n = vl->n;
5194  struct DistanceVector *dv = vl->dv;
5195  struct GNUNET_TIME_Absolute now;
5196  struct VirtualLink *vl_next_hop;
5197  int elig;
5198 
5200  "check_vl_transmission to target %s\n",
5201  GNUNET_i2s (&vl->target));
5202  /* Check that we have an eligible pending message!
5203  (cheaper than having #transmit_on_queue() find out!) */
5204  elig = GNUNET_NO;
5205  for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
5206  pm = pm->next_vl)
5207  {
5209  "check_vl_transmission loop\n");
5210  if (NULL != pm->qe)
5211  continue; /* not eligible, is in a queue! */
5212  if (pm->bytes_msg + vl->outbound_fc_window_size_used >
5214  {
5216  "Stalled message %llu transmission on VL %s due to flow control: %llu < %llu\n",
5217  pm->logging_uuid,
5218  GNUNET_i2s (&vl->target),
5219  (unsigned long long) vl->outbound_fc_window_size,
5220  (unsigned long long) (pm->bytes_msg
5222  consider_sending_fc (vl);
5223  return; /* We have a message, but flow control says "nope" */
5224  }
5226  "Target window on VL %s not stalled. Scheduling transmission on queue\n",
5227  GNUNET_i2s (&vl->target));
5228  /* Notify queues at direct neighbours that we are interested */
5229  now = GNUNET_TIME_absolute_get ();
5230  if (NULL != n)
5231  {
5232  for (struct Queue *queue = n->queue_head; NULL != queue;
5233  queue = queue->next_neighbour)
5234  {
5235  if ((GNUNET_YES == queue->idle) &&
5236  (queue->validated_until.abs_value_us > now.abs_value_us))
5237  {
5239  "Direct neighbour %s not stalled\n",
5240  GNUNET_i2s (&n->pid));
5242  queue,
5244  elig = GNUNET_YES;
5245  }
5246  else
5248  "Neighbour Queue QID: %u (%u) busy or invalid\n",
5249  queue->qid,
5250  queue->idle);
5251  }
5252  }
5253  /* Notify queues via DV that we are interested */
5254  if (NULL != dv)
5255  {
5256  /* Do DV with lower scheduler priority, which effectively means that
5257  IF a neighbour exists and is available, we prefer it. */
5258  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
5259  pos = pos->next_dv)
5260  {
5261  struct Neighbour *nh = pos->next_hop;
5262 
5263 
5264  if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
5265  continue; /* skip this one: path not validated */
5266  else
5267  {
5268  vl_next_hop = lookup_virtual_link (&nh->pid);
5269  GNUNET_assert (NULL != vl_next_hop);
5270  if (pm->bytes_msg + vl_next_hop->outbound_fc_window_size_used >
5271  vl_next_hop->outbound_fc_window_size)
5272  {
5274  "Stalled message %llu transmission on next hop %s due to flow control: %llu < %llu\n",
5275  pm->logging_uuid,
5276  GNUNET_i2s (&vl_next_hop->target),
5277  (unsigned long
5278  long) vl_next_hop->outbound_fc_window_size,
5279  (unsigned long long) (pm->bytes_msg
5280  + vl_next_hop->
5281  outbound_fc_window_size_used));
5282  consider_sending_fc (vl_next_hop);
5283  continue; /* We have a message, but flow control says "nope" for the first hop of this path */
5284  }
5285  for (struct Queue *queue = nh->queue_head; NULL != queue;
5286  queue = queue->next_neighbour)
5287  if ((GNUNET_YES == queue->idle) &&
5288  (queue->validated_until.abs_value_us > now.abs_value_us))
5289  {
5291  "Next hop neighbour %s not stalled\n",
5292  GNUNET_i2s (&nh->pid));
5294  queue,
5296  elig = GNUNET_YES;
5297  }
5298  else
5300  "DV Queue QID: %u (%u) busy or invalid\n",
5301  queue->qid,
5302  queue->idle);
5303  }
5304  }
5305  }
5306  if (GNUNET_YES == elig)
5308  "Eligible message %llu of size %u to %s: %llu/%llu\n",
5309  pm->logging_uuid,
5310  pm->bytes_msg,
5311  GNUNET_i2s (&vl->target),
5312  (unsigned long long) vl->outbound_fc_window_size,
5313  (unsigned long long) (pm->bytes_msg
5315  break;
5316  }
5317 }
5318 
5319 
5326 static void
5327 handle_client_send (void *cls, const struct OutboundMessage *obm)
5328 {
5329  struct TransportClient *tc = cls;
5330  struct PendingMessage *pm;
5331  const struct GNUNET_MessageHeader *obmm;
5332  uint32_t bytes_msg;
5333  struct VirtualLink *vl;
5335 
5336  GNUNET_assert (CT_CORE == tc->type);
5337  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
5338  bytes_msg = ntohs (obmm->size);
5339  pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
5340  vl = lookup_virtual_link (&obm->peer);
5341  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5342  {
5344  "Don't have %s as a neighbour (anymore).\n",
5345  GNUNET_i2s (&obm->peer));
5346  /* Failure: don't have this peer as a neighbour (anymore).
5347  Might have gone down asynchronously, so this is NOT
5348  a protocol violation by CORE. Still count the event,
5349  as this should be rare. */
5352  "# messages dropped (neighbour unknown)",
5353  1,
5354  GNUNET_NO);
5355  return;
5356  }
5357 
5358  pm = GNUNET_malloc (sizeof(struct PendingMessage) + bytes_msg);
5360  "1 created pm %p storing vl %p\n",
5361  pm,
5362  vl);
5363  pm->logging_uuid = logging_uuid_gen++;
5364  pm->prefs = pp;
5365  pm->client = tc;
5366  pm->vl = vl;
5367  pm->bytes_msg = bytes_msg;
5368  memcpy (&pm[1], obmm, bytes_msg);
5370  "Sending %u bytes as <%llu> to %s\n",
5371  bytes_msg,
5372  pm->logging_uuid,
5373  GNUNET_i2s (&obm->peer));
5375  tc->details.core.pending_msg_head,
5376  tc->details.core.pending_msg_tail,
5377  pm);
5379  vl->pending_msg_head,
5380  vl->pending_msg_tail,
5381  pm);
5382  check_vl_transmission (vl);
5384 }
5385 
5386 
5396 static void
5398  void *cls,
5399  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
5400 {
5401  struct Neighbour *n;
5402  struct VirtualLink *vl;
5403  struct TransportClient *tc = cls;
5404  const struct GNUNET_MessageHeader *inbox =
5405  (const struct GNUNET_MessageHeader *) &cb[1];
5406  uint16_t isize = ntohs (inbox->size);
5407  const char *is = ((const char *) &cb[1]) + isize;
5408  size_t slen = strlen (is) + 1;
5409  char
5410  mbuf[slen + isize
5411  + sizeof(struct
5415 
5416  /* 0-termination of 'is' was checked already in
5417  #check_communicator_backchannel() */
5419  "Preparing backchannel transmission to %s:%s of type %u and size %u\n",
5420  GNUNET_i2s (&cb->pid),
5421  is,
5422  ntohs (inbox->type),
5423  ntohs (inbox->size));
5424  /* encapsulate and encrypt message */
5425  be->header.type =
5427  be->header.size = htons (sizeof(mbuf));
5428  memcpy (&be[1], inbox, isize);
5429  memcpy (&mbuf[sizeof(struct TransportBackchannelEncapsulationMessage)
5430  + isize],
5431  is,
5432  strlen (is) + 1);
5433  // route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED);
5434  vl = lookup_virtual_link (&cb->pid);
5435  if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
5436  {
5438  }
5439  else
5440  {
5441  /* Use route via neighbour */
5442  n = lookup_neighbour (&cb->pid);
5443  if (NULL != n)
5445  n,
5446  &be->header,
5447  RMO_NONE);
5448  }
5450 }
5451 
5452 
5460 static int
5462  const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
5463 {
5464  struct TransportClient *tc = cls;
5465 
5466  if (CT_COMMUNICATOR != tc->type)
5467  {
5468  GNUNET_break (0);
5469  return GNUNET_SYSERR;
5470  }
5472  return GNUNET_OK;
5473 }
5474 
5475 
5481 static void
5482 store_pi (void *cls);
5483 
5484 
5491 static void
5492 peerstore_store_own_cb (void *cls, int success)
5493 {
5494  struct AddressListEntry *ale = cls;
5495 
5496  ale->sc = NULL;
5497  if (GNUNET_YES != success)
5499  "Failed to store our own address `%s' in peerstore!\n",
5500  ale->address);
5501  else
5503  "Successfully stored our own address `%s' in peerstore!\n",
5504  ale->address);
5505  /* refresh period is 1/4 of expiration time, that should be plenty
5506  without being excessive. */
5507  ale->st =
5509  4ULL),
5510  &store_pi,
5511  ale);
5512 }
5513 
5514 
5520 static void
5521 store_pi (void *cls)
5522 {
5523  struct AddressListEntry *ale = cls;
5524  void *addr;
5525  size_t addr_len;
5527 
5528  ale->st = NULL;
5531  "Storing our address `%s' in peerstore until %s!\n",
5532  ale->address,
5535  ale->nt,
5538  &addr,
5539  &addr_len);
5541  "transport",
5542  &GST_my_identity,
5544  addr,
5545  addr_len,
5546  expiration,
5549  ale);
5550  GNUNET_free (addr);
5551  if (NULL == ale->sc)
5552  {
5554  "Failed to store our address `%s' with peerstore\n",
5555  ale->address);
5556  ale->st =
5558  }
5559 }
5560 
5561 
5568 static void
5570  const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
5571 {
5572  struct TransportClient *tc = cls;
5573  struct AddressListEntry *ale;
5574  size_t slen;
5575 
5576  /* 0-termination of &aam[1] was checked in #check_add_address */
5578  "Communicator added address `%s'!\n",
5579  (const char *) &aam[1]);
5580  slen = ntohs (aam->header.size) - sizeof(*aam);
5581  ale = GNUNET_malloc (sizeof(struct AddressListEntry) + slen);
5582  ale->tc = tc;
5583  ale->address = (const char *) &ale[1];
5584  ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
5585  ale->aid = aam->aid;
5586  ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
5587  memcpy (&ale[1], &aam[1], slen);
5588  GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
5589  tc->details.communicator.addr_tail,
5590  ale);
5591  ale->st = GNUNET_SCHEDULER_add_now (&store_pi, ale);
5593 }
5594 
5595 
5602 static void
5604  const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
5605 {
5606  struct TransportClient *tc = cls;
5607  struct AddressListEntry *alen;
5608 
5609  if (CT_COMMUNICATOR != tc->type)
5610  {
5611  GNUNET_break (0);
5612  GNUNET_SERVICE_client_drop (tc->client);
5613  return;
5614  }
5615  for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
5616  NULL != ale;
5617  ale = alen)
5618  {
5619  alen = ale->next;
5620  if (dam->aid != ale->aid)
5621  continue;
5622  GNUNET_assert (ale->tc == tc);
5624  "Communicator deleted address `%s'!\n",
5625  ale->address);
5628  return;
5629  }
5631  "Communicator removed address we did not even have.\n");
5633  // GNUNET_SERVICE_client_drop (tc->client);
5634 }
5635 
5636 
5644 static void
5646 
5647 
5655 static void
5656 core_env_sent_cb (void *cls)
5657 {
5658  struct CoreSentContext *ctx = cls;
5659  struct VirtualLink *vl = ctx->vl;
5660 
5661  if (NULL == vl)
5662  {
5663  /* lost the link in the meantime, ignore */
5664  GNUNET_free (ctx);
5665  return;
5666  }
5669  vl->incoming_fc_window_size_ram -= ctx->size;
5670  vl->incoming_fc_window_size_used += ctx->isize;
5671  consider_sending_fc (vl);
5672  GNUNET_free (ctx);
5673 }
5674 
5675 
5676 static void
5678  const struct GNUNET_MessageHeader *mh,
5679  struct CommunicatorMessageContext *cmc,
5680  unsigned int continue_client)
5681 {
5682  uint16_t size = ntohs (mh->size);
5683  int have_core;
5684 
5685  if (vl->incoming_fc_window_size_ram > UINT_MAX - size)
5686  {
5688  "# CORE messages dropped (FC arithmetic overflow)",
5689  1,
5690  GNUNET_NO);
5692  "CORE messages of type %u with %u bytes dropped (FC arithmetic overflow)\n",
5693  (unsigned int) ntohs (mh->type),
5694  (unsigned int) ntohs (mh->size));
5695  finish_cmc_handling_with_continue (cmc, continue_client);
5696  return;
5697  }
5699  {
5701  "# CORE messages dropped (FC window overflow)",
5702  1,
5703  GNUNET_NO);
5705  "CORE messages of type %u with %u bytes dropped (FC window overflow)\n",
5706  (unsigned int) ntohs (mh->type),
5707  (unsigned int) ntohs (mh->size));
5708  finish_cmc_handling_with_continue (cmc, continue_client);
5709  return;
5710  }
5711 
5712  /* Forward to all CORE clients */
5713  have_core = GNUNET_NO;
5714  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
5715  {
5716  struct GNUNET_MQ_Envelope *env;
5717  struct InboundMessage *im;
5718  struct CoreSentContext *ctx;
5719 
5720  if (CT_CORE != tc->type)
5721  continue;
5724  ctx = GNUNET_new (struct CoreSentContext);
5725  ctx->vl = vl;
5726  ctx->size = size;
5727  ctx->isize = (GNUNET_NO == have_core) ? size : 0;
5728  have_core = GNUNET_YES;
5731  im->peer = cmc->im.sender;
5732  memcpy (&im[1], mh, size);
5733  GNUNET_MQ_send (tc->mq, env);
5734  vl->core_recv_window--;
5735  }
5736  if (GNUNET_NO == have_core)
5737  {
5739  "Dropped message to CORE: no CORE client connected!\n");
5740  /* Nevertheless, count window as used, as it is from the
5741  perspective of the other peer! */
5743  /* TODO-M1 */
5745  "Dropped message of type %u with %u bytes to CORE: no CORE client connected!\n",
5746  (unsigned int) ntohs (mh->type),
5747  (unsigned int) ntohs (mh->size));
5748  finish_cmc_handling_with_continue (cmc, continue_client);
5749  return;
5750  }
5752  "Delivered message from %s of type %u to CORE recv window %u\n",
5753  GNUNET_i2s (&cmc->im.sender),
5754  ntohs (mh->type),
5755  vl->core_recv_window);
5756  if (vl->core_recv_window > 0)
5757  {
5758  finish_cmc_handling_with_continue (cmc, continue_client);
5759  return;
5760  }
5761  /* Wait with calling #finish_cmc_handling(cmc) until the message
5762  was processed by CORE MQs (for CORE flow control)! */
5764 }
5765 
5766 
5775 static void
5776 handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
5777 {
5778  struct CommunicatorMessageContext *cmc = cls;
5779  // struct CommunicatorMessageContext *cmc_copy =
5780  // GNUNET_new (struct CommunicatorMessageContext);
5781  struct GNUNET_MessageHeader *mh_copy;
5782  struct RingBufferEntry *rbe;
5783  struct VirtualLink *vl;
5784  uint16_t size = ntohs (mh->size);
5785 
5787  "Handling raw message of type %u with %u bytes\n",
5788  (unsigned int) ntohs (mh->type),
5789  (unsigned int) ntohs (mh->size));
5790 
5791  if ((size > UINT16_MAX - sizeof(struct InboundMessage)) ||
5792  (size < sizeof(struct GNUNET_MessageHeader)))
5793  {
5794  struct GNUNET_SERVICE_Client *client = cmc->tc->client;
5795 
5796  GNUNET_break (0);
5797  finish_cmc_handling (cmc);
5798  GNUNET_SERVICE_client_drop (client);
5799  return;
5800  }
5801  vl = lookup_virtual_link (&cmc->im.sender);
5802  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5803  {
5804  /* FIXME: sender is giving us messages for CORE but we don't have
5805  the link up yet! I *suspect* this can happen right now (i.e.
5806  sender has verified us, but we didn't verify sender), but if
5807  we pass this on, CORE would be confused (link down, messages
5808  arrive). We should investigate more if this happens often,
5809  or in a persistent manner, and possibly do "something" about
5810  it. Thus logging as error for now. */
5811 
5812  mh_copy = GNUNET_malloc (size);
5813  rbe = GNUNET_new (struct RingBufferEntry);
5814  rbe->cmc = cmc;
5815  /*cmc_copy->tc = cmc->tc;
5816  cmc_copy->im = cmc->im;*/
5817  GNUNET_memcpy (mh_copy, mh, size);
5818 
5819  rbe->mh = mh_copy;
5820 
5821  ring_buffer[ring_buffer_head] = rbe;// cmc_copy;
5822  // cmc_copy->mh = (const struct GNUNET_MessageHeader *) mh_copy;
5823  cmc->mh = (const struct GNUNET_MessageHeader *) mh_copy;
5825  "Storing message for %s and type %u (%u) in ring buffer\n",
5826  GNUNET_i2s (&cmc->im.sender),
5827  (unsigned int) ntohs (mh->type),
5828  (unsigned int) ntohs (mh_copy->type));
5830  {
5831  ring_buffer_head = 0;
5833  }
5834  else
5835  ring_buffer_head++;
5836 
5838  "%u items stored in ring buffer\n",
5840 
5841  /*GNUNET_break_op (0);
5842  GNUNET_STATISTICS_update (GST_stats,
5843  "# CORE messages dropped (virtual link still down)",
5844  1,
5845  GNUNET_NO);
5846 
5847  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
5848  "CORE messages of type %u with %u bytes dropped (virtual link still down)\n",
5849  (unsigned int) ntohs (mh->type),
5850  (unsigned int) ntohs (mh->size));
5851  finish_cmc_handling (cmc);*/
5853  // GNUNET_free (cmc);
5854  return;
5855  }
5857 }
5858 
5859 
5867 static int
5869 {
5870  uint16_t size = ntohs (fb->header.size);
5871  uint16_t bsize = size - sizeof(*fb);
5872 
5873  (void) cls;
5874  if (0 == bsize)
5875  {
5876  GNUNET_break_op (0);
5877  return GNUNET_SYSERR;
5878  }
5879  if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
5880  {
5881  GNUNET_break_op (0);
5882  return GNUNET_SYSERR;
5883  }
5884  if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
5885  {
5886  GNUNET_break_op (0);
5887  return GNUNET_SYSERR;
5888  }
5889  return GNUNET_YES;
5890 }
5891 
5892 
5898 static void
5900 {
5901  struct AcknowledgementCummulator *ac = cls;
5902 
5903  ac->task = NULL;
5904  GNUNET_assert (0 == ac->num_acks);
5905  GNUNET_assert (
5906  GNUNET_YES ==
5908  GNUNET_free (ac);
5909 }
5910 
5911 
5917 static void
5919 {
5920  struct Neighbour *n;
5921  struct VirtualLink *vl;
5922  struct AcknowledgementCummulator *ac = cls;
5923  char buf[sizeof(struct TransportReliabilityAckMessage)
5924  + ac->num_acks
5926  struct TransportReliabilityAckMessage *ack =
5929 
5930  ac->task = NULL;
5932  "Sending ACK with %u components to %s\n",
5933  ac->num_acks,
5934  GNUNET_i2s (&ac->target));
5935  GNUNET_assert (0 < ac->num_acks);
5937  ack->header.size =
5938  htons (sizeof(*ack)
5939  + ac->num_acks * sizeof(struct TransportCummulativeAckPayloadP));
5940  ack->ack_counter = htonl (ac->ack_counter += ac->num_acks);
5941  ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
5942  for (unsigned int i = 0; i < ac->num_acks; i++)
5943  {
5944  ap[i].ack_uuid = ac->ack_uuids[i].ack_uuid;
5946  GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time));
5947  }
5948  /*route_control_message_without_fc (
5949  &ac->target,
5950  &ack->header,
5951  RMO_DV_ALLOWED);*/
5952  vl = lookup_virtual_link (&ac->target);
5953  if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
5954  {
5956  vl,
5957  &ack->header,
5958  RMO_DV_ALLOWED);
5959  }
5960  else
5961  {
5962  /* Use route via neighbour */
5963  n = lookup_neighbour (&ac->target);
5964  if (NULL != n)
5966  n,
5967  &ack->header,
5968  RMO_NONE);
5969  }
5970  ac->num_acks = 0;
5973  ac);
5974 }
5975 
5976 
5985 static void
5987  const struct AcknowledgementUUIDP *ack_uuid,
5988  struct GNUNET_TIME_Absolute max_delay)
5989 {
5990  struct AcknowledgementCummulator *ac;
5991 
5993  "Scheduling ACK %s for transmission to %s\n",
5994  GNUNET_uuid2s (&ack_uuid->value),
5995  GNUNET_i2s (pid));
5997  if (NULL == ac)
5998  {
6000  ac->target = *pid;
6001  ac->min_transmission_time = max_delay;
6005  &ac->target,
6006  ac,
6008  }
6009  else
6010  {
6011  if (MAX_CUMMULATIVE_ACKS == ac->num_acks)
6012  {
6013  /* must run immediately, ack buffer full! */
6015  }
6016  GNUNET_SCHEDULER_cancel (ac->task);
6017  ac->min_transmission_time =
6018  GNUNET_TIME_absolute_min (ac->min_transmission_time, max_delay);
6019  }
6020  GNUNET_assert (ac->num_acks < MAX_CUMMULATIVE_ACKS);
6021  ac->ack_uuids[ac->num_acks].receive_time = GNUNET_TIME_absolute_get ();
6022  ac->ack_uuids[ac->num_acks].ack_uuid = *ack_uuid;
6023  ac->num_acks++;
6024  ac->task = GNUNET_SCHEDULER_add_at (ac->min_transmission_time,
6026  ac);
6027 }
6028 
6029 
6034 {
6038  struct MessageUUIDP message_uuid;
6039 
6044 };
6045 
6046 
6056 static int
6057 find_by_message_uuid (void *cls, uint32_t key, void *value)
6058 {
6059  struct FindByMessageUuidContext *fc = cls;
6060  struct ReassemblyContext *rc = value;
6061 
6062  (void) key;
6063  if (0 == GNUNET_memcmp (&fc->message_uuid, &rc->msg_uuid))
6064  {
6065  fc->rc = rc;
6066  return GNUNET_NO;
6067  }
6068  return GNUNET_YES;
6069 }
6070 
6071 
6079 static void
6081 {
6082  struct CommunicatorMessageContext *cmc = cls;
6083  struct VirtualLink *vl;
6084  struct ReassemblyContext *rc;
6085  const struct GNUNET_MessageHeader *msg;
6086  uint16_t msize;
6087  uint16_t fsize;
6088  uint16_t frag_off;
6089  char *target;
6090  struct GNUNET_TIME_Relative cdelay;
6091  struct FindByMessageUuidContext fc;
6092 
6093  vl = lookup_virtual_link (&cmc->im.sender);
6094  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
6095  {
6096  struct GNUNET_SERVICE_Client *client = cmc->tc->client;
6097 
6099  "No virtual link for %s to handle fragment\n",
6100  GNUNET_i2s (&cmc->im.sender));
6101  GNUNET_break (0);
6102  finish_cmc_handling (cmc);
6103  GNUNET_SERVICE_client_drop (client);
6104  return;
6105  }
6106  if (NULL == vl->reassembly_map)
6107  {
6109  vl->reassembly_heap =
6114  vl);
6115  }
6116  msize = ntohs (fb->msg_size);
6117  fc.message_uuid = fb->msg_uuid;
6118  fc.rc = NULL;
6120  fb->msg_uuid.uuid,
6122  &fc);
6123  fsize = ntohs (fb->header.size) - sizeof(*fb);
6124  if (NULL == (rc = fc.rc))
6125  {
6126  rc = GNUNET_malloc (sizeof(*rc) + msize /* reassembly payload buffer */
6127  + (msize + 7) / 8 * sizeof(uint8_t) /* bitfield */);
6128  rc->msg_uuid = fb->msg_uuid;
6129  rc->virtual_link = vl;
6130  rc->msg_size = msize;
6131  rc->reassembly_timeout =
6135  rc,
6139  vl->reassembly_map,
6140  rc->msg_uuid.uuid,
6141  rc,
6143  target = (char *) &rc[1];
6144  rc->bitfield = (uint8_t *) (target + rc->msg_size);
6145  if (fsize != rc->msg_size)
6146  rc->msg_missing = rc->msg_size;
6147  else
6148  rc->msg_missing = 0;
6150  "Received fragment with size %u at offset %u/%u %u bytes missing from %s for NEW message %u\n",
6151  fsize,
6152  ntohs (fb->frag_off),
6153  msize,
6154  rc->msg_missing,
6155  GNUNET_i2s (&cmc->im.sender),
6156  (unsigned int) fb->msg_uuid.uuid);
6157  }
6158  else
6159  {
6160  target = (char *) &rc[1];
6162  "Received fragment at offset %u/%u from %s for message %u\n",
6163  ntohs (fb->frag_off),
6164  msize,
6165  GNUNET_i2s (&cmc->im.sender),
6166  (unsigned int) fb->msg_uuid.uuid);
6167  }
6168  if (msize != rc->msg_size)
6169  {
6170  GNUNET_break (0);
6171  finish_cmc_handling (cmc);
6172  return;
6173  }
6174 
6175  /* reassemble */
6176  if (0 == fsize)
6177  {
6178  GNUNET_break (0);
6179  finish_cmc_handling (cmc);
6180  return;
6181  }
6182  frag_off = ntohs (fb->frag_off);
6183  if (frag_off + fsize > msize)
6184  {
6185  /* Fragment (plus fragment size) exceeds message size! */
6186  GNUNET_break_op (0);
6187  finish_cmc_handling (cmc);
6188  return;
6189  }
6190  memcpy (&target[frag_off], &fb[1], fsize);
6191  /* update bitfield and msg_missing */
6192  for (unsigned int i = frag_off; i < frag_off + fsize; i++)
6193  {
6194  if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
6195  {
6196  rc->bitfield[i / 8] |= (1 << (i % 8));
6197  rc->msg_missing--;
6198  }
6199  }
6200 
6201  /* Compute cumulative ACK */
6203  cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->msg_missing / fsize);
6204  if (0 == rc->msg_missing)
6205  cdelay = GNUNET_TIME_UNIT_ZERO;
6206  cummulative_ack (&cmc->im.sender,
6207  &fb->ack_uuid,
6210  /* is reassembly complete? */
6211  if (0 != rc->msg_missing)
6212  {
6213  finish_cmc_handling (cmc);
6214  return;
6215  }
6216  /* reassembly is complete, verify result */
6217  msg = (const struct GNUNET_MessageHeader *) &rc[1];
6218  if (ntohs (msg->size) != rc->msg_size)
6219  {
6220  GNUNET_break (0);
6222  finish_cmc_handling (cmc);
6223  return;
6224  }
6225  /* successful reassembly */
6227  "Fragment reassembly complete for message %u\n",
6228  (unsigned int) fb->msg_uuid.uuid);
6229  /* FIXME: check that the resulting msg is NOT a
6230  DV Box or Reliability Box, as that is NOT allowed! */
6231  cmc->mh = msg;
6232  demultiplex_with_cmc (cmc);
6233  /* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still
6234  en-route and we forget that we finished this reassembly immediately!
6235  -> keep around until timeout?
6236  -> shorten timeout based on ACK? */
6238 }
6239 
6240 
6248 static int
6250  const struct TransportReliabilityBoxMessage *rb)
6251 {
6252  (void) cls;
6253  const struct GNUNET_MessageHeader *inbox = (const struct
6254  GNUNET_MessageHeader *) &rb[1];
6255 
6257  "check_send_msg with size %u: inner msg type %u and size %u (%lu %lu)\n",
6258  ntohs (rb->header.size),
6259  ntohs (inbox->type),
6260  ntohs (inbox->size),
6261  sizeof (struct TransportReliabilityBoxMessage),
6262  sizeof (struct GNUNET_MessageHeader));
6264  return GNUNET_YES;
6265 }
6266 
6267 
6275 static void
6277  const struct TransportReliabilityBoxMessage *rb)
6278 {
6279  struct CommunicatorMessageContext *cmc = cls;
6280  const struct GNUNET_MessageHeader *inbox =
6281  (const struct GNUNET_MessageHeader *) &rb[1];
6282  struct GNUNET_TIME_Relative rtt;
6283 
6285  "Received reliability box from %s with UUID %s of type %u\n",
6286  GNUNET_i2s (&cmc->im.sender),
6287  GNUNET_uuid2s (&rb->ack_uuid.value),
6288  (unsigned int) ntohs (inbox->type));
6289  rtt = GNUNET_TIME_UNIT_SECONDS; /* FIXME: should base this on "RTT", but we
6290  do not really have an RTT for the
6291  * incoming* queue (should we have
6292  the sender add it to the rb message?) */
6293  cummulative_ack (
6294  &cmc->im.sender,
6295  &rb->ack_uuid,
6296  (0 == ntohl (rb->ack_countdown))
6299  GNUNET_TIME_relative_divide (rtt, 8 /* FIXME: magic constant */)));
6300  /* continue with inner message */
6301  /* FIXME: check that inbox is NOT a DV Box, fragment or another
6302  reliability box (not allowed!) */
6303  cmc->mh = inbox;
6304  demultiplex_with_cmc (cmc);
6305 }
6306 
6307 
6316 static void
6317 update_pd_age (struct PerformanceData *pd, unsigned int age)
6318 {
6319  unsigned int sage;
6320 
6321  if (age == pd->last_age)
6322  return; /* nothing to do */
6323  sage = GNUNET_MAX (pd->last_age, age - 2 * GOODPUT_AGING_SLOTS);
6324  for (unsigned int i = sage; i <= age - GOODPUT_AGING_SLOTS; i++)
6325  {
6326  struct TransmissionHistoryEntry *the = &pd->the[i % GOODPUT_AGING_SLOTS];
6327 
6328  the->bytes_sent = 0;
6329  the->bytes_received = 0;
6330  }
6331  pd->last_age = age;
6332 }
6333 
6334 
6343 static void
6345  struct GNUNET_TIME_Relative rtt,
6346  uint16_t bytes_transmitted_ok)
6347 {
6348  uint64_t nval = rtt.rel_value_us;
6349  uint64_t oval = pd->aged_rtt.rel_value_us;
6350  unsigned int age = get_age ();
6351  struct TransmissionHistoryEntry *the = &pd->the[age % GOODPUT_AGING_SLOTS];
6352 
6353  if (oval == GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us)
6354  pd->aged_rtt = rtt;
6355  else
6356  pd->aged_rtt.rel_value_us = (nval + 7 * oval) / 8;
6357  update_pd_age (pd, age);
6358  the->bytes_received += bytes_transmitted_ok;
6359 }
6360 
6361 
6369 static void
6371  struct GNUNET_TIME_Relative rtt,
6372  uint16_t bytes_transmitted_ok)
6373 {
6374  update_performance_data (&q->pd, rtt, bytes_transmitted_ok);
6375 }
6376 
6377 
6385 static void
6387  struct GNUNET_TIME_Relative rtt,
6388  uint16_t bytes_transmitted_ok)
6389 {
6390  update_performance_data (&dvh->pd, rtt, bytes_transmitted_ok);
6391 }
6392 
6393 
6401 static void
6403 {
6404  struct PendingMessage *pos;
6405 
6407  "Complete transmission of message %llu %u\n",
6408  pm->logging_uuid,
6409  pm->pmt);
6410  switch (pm->pmt)
6411  {
6412  case PMT_CORE:
6413  case PMT_RELIABILITY_BOX:
6414  /* Full message sent, we are done */
6416  return;
6417 
6418  case PMT_FRAGMENT_BOX:
6419  /* Fragment sent over reliable channel */
6420  pos = pm->frag_parent;
6421  GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
6423  /* check if subtree is done */
6424  while ((NULL == pos->head_frag) && (pos->frag_off == (pos->bytes_msg
6425  - sizeof(struct
6427  &&
6428  (NULL != pos->frag_parent))
6429  {
6430  pm = pos;
6431  pos = pm->frag_parent;
6432  if ((NULL == pos) && (PMT_DV_BOX == pm->pmt))
6433  {
6435  return;
6436  }
6437  else if (PMT_DV_BOX == pm->pmt)
6438  {
6439  client_send_response (pos);
6440  return;
6441  }
6442  GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
6444  }
6445 
6446  /* Was this the last applicable fragment? */
6447  if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) &&
6448  (pos->frag_off == pos->bytes_msg))
6449  client_send_response (pos);
6450  return;
6451 
6452  case PMT_DV_BOX:
6454  "Completed transmission of message %llu (DV Box)\n",
6455  pm->logging_uuid);
6456  if (NULL != pm->frag_parent)
6457  {
6458  if (NULL != pm->bpm)
6459  {
6460  GNUNET_free (pm->bpm);
6462  "Freed bpm\n");
6463  }
6464  pos = pm->frag_parent;
6466  pos->bpm = NULL;
6467  client_send_response (pos);
6468  }
6469  else
6471  return;
6472  }
6473 }
6474 
6475 
6483 static void
6485  struct GNUNET_TIME_Relative ack_delay)
6486 {
6487  struct GNUNET_TIME_Relative delay;
6488 
6490  delay = GNUNET_TIME_relative_subtract (delay, ack_delay);
6491  if (NULL != pa->queue && 1 == pa->num_send)
6493  if (NULL != pa->dvh && 1 == pa->num_send)
6495  if (NULL != pa->pm)
6498 }
6499 
6500 
6508 static int
6510  const struct TransportReliabilityAckMessage *ra)
6511 {
6512  unsigned int n_acks;
6513 
6514  (void) cls;
6515  n_acks = (ntohs (ra->header.size) - sizeof(*ra))
6516  / sizeof(struct TransportCummulativeAckPayloadP);
6517  if (0 == n_acks)
6518  {
6519  GNUNET_break_op (0);
6520  return GNUNET_SYSERR;
6521  }
6522  if ((ntohs (ra->header.size) - sizeof(*ra)) !=
6523  n_acks * sizeof(struct TransportCummulativeAckPayloadP))
6524  {
6525  GNUNET_break_op (0);
6526  return GNUNET_SYSERR;
6527  }
6528  return GNUNET_OK;
6529 }
6530 
6531 
6539 static void
6541  const struct TransportReliabilityAckMessage *ra)
6542 {
6543  struct CommunicatorMessageContext *cmc = cls;
6544  const struct TransportCummulativeAckPayloadP *ack;
6545  unsigned int n_acks;
6546  uint32_t ack_counter;
6547 
6548  n_acks = (ntohs (ra->header.size) - sizeof(*ra))
6549  / sizeof(struct TransportCummulativeAckPayloadP);
6550  ack = (const struct TransportCummulativeAckPayloadP *) &ra[1];
6551  for (unsigned int i = 0; i < n_acks; i++)
6552  {
6553  struct PendingAcknowledgement *pa =
6555  if (NULL == pa)
6556  {
6558  "Received ACK from %s with UUID %s which is unknown to us!\n",
6559  GNUNET_i2s (&cmc->im.sender),
6560  GNUNET_uuid2s (&ack[i].ack_uuid.value));
6562  GST_stats,
6563  "# FRAGMENT_ACKS dropped, no matching pending message",
6564  1,
6565  GNUNET_NO);
6566  continue;
6567  }
6569  "Received ACK from %s with UUID %s\n",
6570  GNUNET_i2s (&cmc->im.sender),
6571  GNUNET_uuid2s (&ack[i].ack_uuid.value));
6572  handle_acknowledged (pa, GNUNET_TIME_relative_ntoh (ack[i].ack_delay));
6573  }
6574 
6575  ack_counter = htonl (ra->ack_counter);
6576  (void) ack_counter; /* silence compiler warning for now */
6577  // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere!
6578  // (DV and/or Neighbour?)
6579  finish_cmc_handling (cmc);
6580 }
6581 
6582 
6590 static int
6592  void *cls,
6594 {
6595  uint16_t size = ntohs (be->header.size) - sizeof(*be);
6596  const struct GNUNET_MessageHeader *inbox =
6597  (const struct GNUNET_MessageHeader *) &be[1];
6598  const char *is;
6599  uint16_t isize;
6600 
6601  (void) cls;
6602  if (ntohs (inbox->size) >= size)
6603  {
6604  GNUNET_break_op (0);
6605  return GNUNET_SYSERR;
6606  }
6607  isize = ntohs (inbox->size);
6608  is = ((const char *) inbox) + isize;
6609  size -= isize;
6610  if ('\0' != is[size - 1])
6611  {
6612  GNUNET_break_op (0);
6613  return GNUNET_SYSERR;
6614  }
6615  return GNUNET_YES;
6616 }
6617 
6618 
6627 static void
6629  void *cls,
6631 {
6632  struct CommunicatorMessageContext *cmc = cls;
6633  struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
6634  struct GNUNET_MQ_Envelope *env;
6635  struct TransportClient *tc;
6636  const struct GNUNET_MessageHeader *inbox =
6637  (const struct GNUNET_MessageHeader *) &be[1];
6638  uint16_t isize = ntohs (inbox->size);
6639  const char *target_communicator = ((const char *) inbox) + isize;
6640  char *sender;
6641  char *self;
6642 
6643  GNUNET_asprintf (&sender,
6644  "%s",
6645  GNUNET_i2s (&cmc->im.sender));
6646  GNUNET_asprintf (&self,
6647  "%s",
6649 
6650  /* Find client providing this communicator */
6651  for (tc = clients_head; NULL != tc; tc = tc->next)
6652  if ((CT_COMMUNICATOR == tc->type) &&
6653  (0 ==
6654  strcmp (tc->details.communicator.address_prefix, target_communicator)))
6655  break;
6656  if (NULL == tc)
6657  {
6658  char *stastr;
6659 
6660  GNUNET_asprintf (
6661  &stastr,
6662  "# Backchannel message dropped: target communicator `%s' unknown",
6663  target_communicator);
6665  GNUNET_free (stastr);
6666  finish_cmc_handling (cmc);
6667  return;
6668  }
6669  /* Finally, deliver backchannel message to communicator */
6671  "Delivering backchannel message from %s to %s of type %u to %s\n",
6672  sender,
6673  self,
6674  ntohs (inbox->type),
6675  target_communicator);
6677  cbi,
6678  isize,
6680  cbi->pid = cmc->im.sender;
6681  memcpy (&cbi[1], inbox, isize);
6682  GNUNET_MQ_send (tc->mq, env);
6683  finish_cmc_handling (cmc);
6684 }
6685 
6686 
6696 static void
6697 path_cleanup_cb (void *cls)
6698 {
6699  struct DistanceVector *dv = cls;
6700  struct DistanceVectorHop *pos;
6701 
6702  dv->timeout_task = NULL;
6703  while (NULL != (pos = dv->dv_head))
6704  {
6705  GNUNET_assert (dv == pos->dv);
6707  break;
6709  }
6710  if (NULL == pos)
6711  {
6712  free_dv_route (dv);
6713  return;
6714  }
6715  dv->timeout_task =
6717 }
6718 
6719 
6720 static void
6722 {
6723 
6724  const struct GNUNET_PeerIdentity target = vl->target;
6725 
6726 
6728  {
6729  struct RingBufferEntry *ring_buffer_copy[RING_BUFFER_SIZE];
6730  unsigned int tail = GNUNET_YES == is_ring_buffer_full ? ring_buffer_head :
6731  0;
6732  unsigned int head = GNUNET_YES == is_ring_buffer_full ? RING_BUFFER_SIZE :
6734  struct GNUNET_TRANSPORT_IncomingMessage im;
6735  struct CommunicatorMessageContext *cmc;
6736  struct RingBufferEntry *rbe;
6737  struct GNUNET_MessageHeader *mh;
6738 
6740  "Sending from ring buffer, which has %u items\n",
6742 
6743  ring_buffer_head = 0;
6744  for (unsigned int i = 0; i < head; i++)
6745  {
6746  rbe = ring_buffer[(i + tail) % RING_BUFFER_SIZE];
6747  cmc = rbe->cmc;
6748  mh = rbe->mh;
6749 
6750  im = cmc->im;
6751  // mh = cmc->mh;
6753  "Sending to ring buffer target %s using vl target %s\n",
6754  GNUNET_i2s (&im.sender),
6755  GNUNET_i2s2 (&target));
6756  if (0 == GNUNET_memcmp (&target, &im.sender))
6757  {
6759  "Finish handling message of type %u and size %u\n",
6760  (unsigned int) ntohs (mh->type),
6761  (unsigned int) ntohs (mh->size));
6763  GNUNET_free (mh);
6764  }
6765  else
6766  {
6767  ring_buffer_copy[i] = rbe;
6768  ring_buffer_head++;
6769  }
6770  }
6771 
6774  {
6776  }
6777 
6778  for (unsigned int i = 0; i < ring_buffer_head; i++)
6779  {
6780  ring_buffer[i] = ring_buffer_copy[i];
6782  "ring_buffer_copy[i]->mh->type for i %u %u\n",
6783  i,
6784  ring_buffer_copy[i]->mh->type);
6786  "ring_buffer[i]->mh->type for i %u %u\n",
6787  i,
6788  ring_buffer[i]->mh->type);
6789  }
6790 
6792  "%u items still in ring buffer\n",
6794  }
6795 
6797  {
6798  struct PendingMessage *ring_buffer_dv_copy[RING_BUFFER_SIZE];
6799  struct PendingMessage *pm;
6800  unsigned int tail = GNUNET_YES == is_ring_buffer_dv_full ?
6802  0;
6803  unsigned int head = GNUNET_YES == is_ring_buffer_dv_full ?
6806 
6808  "Sending from ring buffer dv, which has %u items\n",
6810 
6811  ring_buffer_dv_head = 0;
6812  for (unsigned int i = 0; i < head; i++)
6813  {
6815 
6817  "Sending to ring buffer target %s using vl target %s\n",
6818  GNUNET_i2s (&pm->target),
6819  GNUNET_i2s2 (&target));
6820  if (0 == GNUNET_memcmp (&target, &pm->target))
6821  {
6823  "Adding PendingMessage to vl, checking transmission.\n");
6824  pm->vl = vl;
6828  pm);
6829 
6831  }
6832  else
6833  {
6834  ring_buffer_dv_copy[i] = pm;
6836  }
6837  }
6838 
6840  {
6842  }
6843 
6844  for (unsigned int i = 0; i < ring_buffer_dv_head; i++)
6845  ring_buffer_dv[i] = ring_buffer_dv_copy[i];
6846 
6848  "%u items still in ring buffer dv.\n",
6850 
6851  }
6852 }
6853 
6854 
6862 static void
6864 {
6865  struct DistanceVector *dv = hop->dv;
6866  struct VirtualLink *vl;
6867 
6868  vl = lookup_virtual_link (&dv->target);
6869  if (NULL == vl)
6870  {
6871 
6872  vl = GNUNET_new (struct VirtualLink);
6874  "Creating new virtual link %p to %s using DV!\n",
6875  vl,
6876  GNUNET_i2s (&dv->target));
6877  vl->confirmed = GNUNET_YES;
6878  vl->message_uuid_ctr =
6880  vl->target = dv->target;
6886  links,
6887  &vl->target,
6888  vl,
6890  vl->dv = dv;
6891  dv->vl = vl;
6892  vl->visibility_task =
6894  consider_sending_fc (vl);
6895  /* We lacked a confirmed connection to the target
6896  before, so tell CORE about it (finally!) */
6898  send_msg_from_cache (vl);
6899  }
6900  else
6901  {
6902  /* Link was already up, remember dv is also now available and we are done */
6903  vl->dv = dv;
6904  dv->vl = vl;
6905  if (GNUNET_NO == vl->confirmed)
6906  {
6907  vl->confirmed = GNUNET_YES;
6908  vl->visibility_task =
6910  consider_sending_fc (vl);
6911  /* We lacked a confirmed connection to the target
6912  before, so tell CORE about it (finally!) */
6914  send_msg_from_cache (vl);
6915  }
6916  else
6918  "Virtual link to %s could now also use DV!\n",
6919  GNUNET_i2s (&dv->target));
6920  }
6921 }
6922 
6923 
6949 static int
6951  unsigned int path_len,
6952  struct GNUNET_TIME_Relative network_latency,
6953  struct GNUNET_TIME_Absolute path_valid_until)
6954 {
6955  struct DistanceVectorHop *hop;
6956  struct DistanceVector *dv;
6957  struct Neighbour *next_hop;
6958  unsigned int shorter_distance;
6959 
6960  if (path_len < 3)
6961  {
6962  /* what a boring path! not allowed! */
6963  GNUNET_break (0);
6964  return GNUNET_SYSERR;
6965  }
6966  GNUNET_assert (0 == GNUNET_memcmp (&GST_my_identity, &path[0]));
6967  next_hop = lookup_neighbour (&path[1]);
6968  if (NULL == next_hop)
6969  {
6970  /* next hop must be a neighbour, otherwise this whole thing is useless! */
6971  GNUNET_break (0);
6972  return GNUNET_SYSERR;
6973  }
6974  for (unsigned int i = 2; i < path_len; i++)
6975  if (NULL != lookup_neighbour (&path[i]))
6976  {
6977  /* Useless path: we have a direct connection to some hop
6978  in the middle of the path, so this one is not even
6979  terribly useful for redundancy */
6981  "Path of %u hops useless: directly link to hop %u (%s)\n",
6982  path_len,
6983  i,
6984  GNUNET_i2s (&path[i]));
6986  "# Useless DV path ignored: hop is neighbour",
6987  1,
6988  GNUNET_NO);
6989  return GNUNET_SYSERR;
6990  }
6991  dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &path[path_len - 1]);
6992  if (NULL == dv)
6993  {
6994  dv = GNUNET_new (struct DistanceVector);
6995  dv->target = path[path_len - 1];
6997  &path_cleanup_cb,
6998  dv);
7001  dv_routes,
7002  &dv->target,
7003  dv,
7005  }
7006  /* Check if we have this path already! */
7007  shorter_distance = 0;
7008  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
7009  pos = pos->next_dv)
7010  {
7011  if (pos->distance < path_len - 3)
7012  shorter_distance++;
7013  /* Note that the distances in 'pos' excludes us (path[0]),
7014  the next_hop (path[1]) and the target so we need to subtract three
7015  and check next_hop explicitly */
7016  if ((pos->distance == path_len - 3) && (pos->next_hop == next_hop))
7017  {
7018  int match = GNUNET_YES;
7019 
7020  for (unsigned int i = 0; i < pos->distance; i++)
7021  {
7022  if (0 != GNUNET_memcmp (&pos->path[i], &path[i + 2]))
7023  {
7024  match = GNUNET_NO;
7025  break;
7026  }
7027  }
7028  if (GNUNET_YES == match)
7029  {
7030  struct GNUNET_TIME_Relative last_timeout;
7031 
7032  /* Re-discovered known path, update timeout */
7034  "# Known DV path refreshed",
7035  1,
7036  GNUNET_NO);
7037  last_timeout = GNUNET_TIME_absolute_get_remaining (pos->timeout);
7038  pos->timeout =
7040  pos->path_valid_until =
7041  GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until);
7042  GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos);
7043  GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos);
7044  if (0 <
7047  if (last_timeout.rel_value_us <
7050  .rel_value_us)
7051  {
7052  /* Some peer send DV learn messages too often, we are learning
7053  the same path faster than it would be useful; do not forward! */
7055  "Rediscovered path too quickly, not forwarding further\n");
7056  return GNUNET_NO;
7057  }
7059  "Refreshed known path to %s valid until %s, forwarding further\n",
7060  GNUNET_i2s (&dv->target),
7062  pos->path_valid_until));
7063  return GNUNET_YES;
7064  }
7065  }
7066  }
7067  /* Count how many shorter paths we have (incl. direct
7068  neighbours) before simply giving up on this one! */
7069  if (shorter_distance >= MAX_DV_PATHS_TO_TARGET)
7070  {
7071  /* We have a shorter path already! */
7073  "Have many shorter DV paths %s, not forwarding further\n",
7074  GNUNET_i2s (&dv->target));
7075  return GNUNET_NO;
7076  }
7077  /* create new DV path entry */
7079  "Discovered new DV path to %s valid until %s\n",
7080  GNUNET_i2s (&dv->target),
7081  GNUNET_STRINGS_absolute_time_to_string (path_valid_until));
7082  hop = GNUNET_malloc (sizeof(struct DistanceVectorHop)
7083  + sizeof(struct GNUNET_PeerIdentity) * (path_len - 3));
7084  hop->next_hop = next_hop;
7085  hop->dv = dv;
7086  hop->path = (const struct GNUNET_PeerIdentity *) &hop[1];
7087  memcpy (&hop[1],
7088  &path[2],
7089  sizeof(struct GNUNET_PeerIdentity) * (path_len - 3));
7091  hop->path_valid_until = path_valid_until;
7092  hop->distance = path_len - 3;
7093  hop->pd.aged_rtt = network_latency;
7094  GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, hop);
7095  GNUNET_CONTAINER_MDLL_insert (neighbour,
7096  next_hop->dv_head,
7097  next_hop->dv_tail,
7098  hop);
7099  if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
7101  return GNUNET_YES;
7102 }
7103 
7104 
7112 static int
7113 check_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
7114 {
7115  uint16_t size = ntohs (dvl->header.size);
7116  uint16_t num_hops = ntohs (dvl->num_hops);
7117  const struct DVPathEntryP *hops = (const struct DVPathEntryP *) &dvl[1];
7118 
7119  (void) cls;
7120  if (size != sizeof(*dvl) + num_hops * sizeof(struct DVPathEntryP))
7121  {
7122  GNUNET_break_op (0);
7123  return GNUNET_SYSERR;
7124  }
7125  if (num_hops > MAX_DV_HOPS_ALLOWED)
7126  {
7127  GNUNET_break_op (0);
7128  return GNUNET_SYSERR;
7129  }
7130  for (unsigned int i = 0; i < num_hops; i++)
7131  {
7132  if (0 == GNUNET_memcmp (&dvl->initiator, &hops[i].hop))
7133  {
7134  GNUNET_break_op (0);
7135  return GNUNET_SYSERR;
7136  }
7137  if (0 == GNUNET_memcmp (&GST_my_identity, &hops[i].hop))
7138  {
7139  GNUNET_break_op (0);
7140  return GNUNET_SYSERR;
7141  }
7142  }
7143  return GNUNET_YES;
7144 }
7145 
7146 
7158 static void
7159 forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
7160  const struct TransportDVLearnMessage *msg,
7161  uint16_t bi_history,
7162  uint16_t nhops,
7163  const struct DVPathEntryP *hops,
7164  struct GNUNET_TIME_Absolute in_time)
7165 {
7166  struct Neighbour *n;
7167  struct VirtualLink *vl;
7168  struct DVPathEntryP *dhops;
7169  char buf[sizeof(struct TransportDVLearnMessage)
7170  + (nhops + 1) * sizeof(struct DVPathEntryP)] GNUNET_ALIGN;
7171  struct TransportDVLearnMessage *fwd = (struct TransportDVLearnMessage *) buf;
7172  struct GNUNET_TIME_Relative nnd;
7173 
7174  /* compute message for forwarding */
7176  "Forwarding DV learn message originating from %s to %s\n",
7177  GNUNET_i2s (&msg->initiator),
7178  GNUNET_i2s2 (next_hop));
7181  fwd->header.size = htons (sizeof(struct TransportDVLearnMessage)
7182  + (nhops + 1) * sizeof(struct DVPathEntryP));
7183  fwd->num_hops = htons (nhops + 1);
7184  fwd->bidirectional = htons (bi_history);
7187  msg->non_network_delay));
7189  fwd->init_sig = msg->init_sig;
7190  fwd->initiator = msg->initiator;
7191  fwd->challenge = msg->challenge;
7192  fwd->monotonic_time = msg->monotonic_time;
7193  dhops = (struct DVPathEntryP *) &fwd[1];
7194  GNUNET_memcpy (dhops, hops, sizeof(struct DVPathEntryP) * nhops);
7195  dhops[nhops].hop = GST_my_identity;
7196  {
7197  struct DvHopPS dhp = {
7199  .purpose.size = htonl (sizeof(dhp)),
7200  .pred = (0 == nhops) ? msg->initiator : dhops[nhops - 1].hop,
7201  .succ = *next_hop,
7202  .challenge = msg->challenge
7203  };
7205  &dhp,
7206  &dhops[nhops].hop_sig);
7207  }
7208  /*route_control_message_without_fc (next_hop,
7209  &fwd->header,
7210  RMO_UNCONFIRMED_ALLOWED);*/
7211  vl = lookup_virtual_link (next_hop);
7212  if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
7213  {
7215  &fwd->header,
7217  }
7218  else
7219  {
7220  /* Use route via neighbour */
7221  n = lookup_neighbour (next_hop);
7222  if (NULL != n)
7224  n,
7225  &fwd->header,
7227  }
7228 }
7229 
7230 
7240 static int
7242  struct GNUNET_TIME_AbsoluteNBO sender_monotonic_time,
7243  const struct GNUNET_PeerIdentity *init,
7245  const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
7246 {
7247  struct DvInitPS ip = { .purpose.purpose = htonl (
7249  .purpose.size = htonl (sizeof(ip)),
7250  .monotonic_time = sender_monotonic_time,
7251  .challenge = *challenge };
7252 
7253  if (
7254  GNUNET_OK !=
7256  &ip,
7257  init_sig,
7258  &init->public_key))
7259  {
7260  GNUNET_break_op (0);
7261  return GNUNET_SYSERR;
7262  }
7263  return GNUNET_OK;
7264 }
7265 
7266 
7271 {
7276 
7280  const struct DVPathEntryP *hops;
7281 
7286 
7291 
7295  unsigned int num_eligible;
7296 
7300  unsigned int num_selections;
7301 
7305  uint16_t nhops;
7306 
7310  uint16_t bi_history;
7311 };
7312 
7313 
7322 static int
7324  const struct GNUNET_PeerIdentity *pid,
7325  void *value)
7326 {
7327  struct NeighbourSelectionContext *nsc = cls;
7328 
7329  (void) value;
7330  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
7331  return GNUNET_YES; /* skip initiator */
7332  for (unsigned int i = 0; i < nsc->nhops; i++)
7333  if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
7334  return GNUNET_YES;
7335  /* skip peers on path */
7336  nsc->num_eligible++;
7337  return GNUNET_YES;
7338 }
7339 
7340 
7351 static int
7353  const struct GNUNET_PeerIdentity *pid,
7354  void *value)
7355 {
7356  struct NeighbourSelectionContext *nsc = cls;
7357 
7359  "transmission %s\n",
7360  GNUNET_i2s (pid));
7361  (void) value;
7362  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
7363  return GNUNET_YES; /* skip initiator */
7364  for (unsigned int i = 0; i < nsc->nhops; i++)
7365  if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
7366  return GNUNET_YES;
7367  /* skip peers on path */
7368  for (unsigned int i = 0; i < nsc->num_selections; i++)
7369  {
7370  if (nsc->selections[i] == nsc->num_eligible)
7371  {
7373  nsc->dvl,
7374  nsc->bi_history,
7375  nsc->nhops,
7376  nsc->hops,
7377  nsc->in_time);
7378  break;
7379  }
7380  }
7381  nsc->num_eligible++;
7382  return GNUNET_YES;
7383 }
7384 
7385 
7429 static unsigned int
7430 calculate_fork_degree (unsigned int hops_taken,
7431  unsigned int neighbour_count,
7432  unsigned int eligible_count)
7433 {
7434  double target_total = 50.0; /* FIXME: use LOG(NSE)? */
7435  double eligible_ratio =
7436  ((double) eligible_count) / ((double) neighbour_count);
7437  double boost_factor = eligible_ratio * eligible_ratio;
7438  unsigned int rnd;
7439  double left;
7440 
7441  if (hops_taken >= 64)
7442  {
7443  GNUNET_break (0);
7444  return 0; /* precaution given bitshift below */
7445  }
7446  for (unsigned int i = 1; i < hops_taken; i++)
7447  {
7448  /* For each hop, subtract the expected number of targets
7449  reached at distance d (so what remains divided by 2^d) */
7450  target_total -= (target_total * boost_factor / (1LLU << i));
7451  }
7452  rnd =
7453  (unsigned int) floor (target_total * boost_factor / (1LLU << hops_taken));
7454  /* round up or down probabilistically depending on how close we were
7455  when floor()ing to rnd */
7456  left = target_total - (double) rnd;
7457  if (UINT32_MAX * left >
7459  rnd++; /* round up */
7461  "Forwarding DV learn message of %u hops %u(/%u/%u) times\n",
7462  hops_taken,
7463  rnd,
7464  eligible_count,
7465  neighbour_count);