GNUnet  0.16.x
gnunet-service-tng.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
75 #include "platform.h"
76 #include "gnunet_util_lib.h"
80 #include "gnunet_hello_lib.h"
81 #include "gnunet_signatures.h"
82 #include "transport.h"
83 
87 #define MAX_FC_RETRANSMIT_COUNT 1000
88 
93 #define MAX_CUMMULATIVE_ACKS 64
94 
107 #define FC_NO_CHANGE_REPLY_PROBABILITY 8
108 
113 #define IN_PACKET_SIZE_WITHOUT_MTU 128
114 
119 #define GOODPUT_AGING_SLOTS 4
120 
125 #define DEFAULT_WINDOW_SIZE (128 * 1024)
126 
135 #define MAX_INCOMING_REQUEST 16
136 
141 #define MAX_DV_DISCOVERY_SELECTION 16
142 
151 #define RECV_WINDOW_SIZE 4
152 
160 #define MIN_DV_PATH_LENGTH_FOR_INITIATOR 3
161 
165 #define MAX_DV_HOPS_ALLOWED 16
166 
171 #define MAX_DV_LEARN_PENDING 64
172 
176 #define MAX_DV_PATHS_TO_TARGET 3
177 
183 #define DELAY_WARN_THRESHOLD \
184  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
185 
190 #define DV_FORWARD_TIMEOUT \
191  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
192 
196 #define DEFAULT_ACK_WAIT_DURATION \
197  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
198 
204 #define DV_QUALITY_RTT_THRESHOLD \
205  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
206 
211 #define DV_PATH_VALIDITY_TIMEOUT \
212  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
213 
218 #define BACKCHANNEL_INACTIVITY_TIMEOUT \
219  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
220 
225 #define DV_PATH_DISCOVERY_FREQUENCY \
226  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
227 
231 #define EPHEMERAL_VALIDITY \
232  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
233 
237 #define REASSEMBLY_EXPIRATION \
238  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
239 
244 #define FAST_VALIDATION_CHALLENGE_FREQ \
245  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
246 
250 #define MAX_VALIDATION_CHALLENGE_FREQ \
251  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
252 
258 #define ACK_CUMMULATOR_TIMEOUT \
259  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
260 
265 #define DV_LEARN_BASE_FREQUENCY GNUNET_TIME_UNIT_MINUTES
266 
271 #define DV_LEARN_QUALITY_THRESHOLD 100
272 
276 #define MAX_ADDRESS_VALID_UNTIL \
277  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MONTHS, 1)
278 
282 #define ADDRESS_VALIDATION_LIFETIME \
283  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
284 
291 #define MIN_DELAY_ADDRESS_VALIDATION GNUNET_TIME_UNIT_MILLISECONDS
292 
299 #define VALIDATION_RTT_BUFFER_FACTOR 3
300 
307 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
308 
314 #define QUEUE_LENGTH_LIMIT 32
315 
316 
318 
323 {
328  uint64_t uuid GNUNET_PACKED;
329 };
330 
331 
336 {
340  struct GNUNET_Uuid value;
341 };
342 
347 {
352 
353  /* Followed by *another* message header which is the message to
354  the communicator */
355 
356  /* Followed by a 0-terminated name of the communicator */
357 };
358 
359 
364 {
369 
385 
390 
396 };
397 
398 
404 {
409 
415 
427 
428  /* Followed by a `struct GNUNET_MessageHeader` with a message
429  for the target peer */
430 };
431 
432 
438 {
443 
451 
458 };
459 
460 
465 {
473 
478 };
479 
480 
489 {
494 
500 
501  /* followed by any number of `struct TransportCummulativeAckPayloadP`
502  messages providing ACKs */
503 };
504 
505 
510 {
515 
520 
525 
534 
539  struct MessageUUIDP msg_uuid;
540 };
541 
542 
560 struct DvInitPS
561 {
566 
580 
585 };
586 
587 
604 struct DvHopPS
605 {
610 
614  struct GNUNET_PeerIdentity pred;
615 
619  struct GNUNET_PeerIdentity succ;
620 
625 };
626 
627 
633 {
637  struct GNUNET_PeerIdentity hop;
638 
644 };
645 
646 
661 {
666 
672 
682 
689 
703 
709 
714 
719 
720  /* Followed by @e num_hops `struct DVPathEntryP` values,
721  excluding the initiator of the DV trace; the last entry is the
722  current sender; the current peer must not be included. */
723 };
724 
725 
749 {
754 
758  unsigned int without_fc;
759 
767 
774 
780 
786  struct GNUNET_ShortHashCode iv;
787 
793  struct GNUNET_HashCode hmac;
794 
804 
805  /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
806  excluding the @e origin and the current peer, the last must be
807  the ultimate target; if @e num_hops is zero, the receiver of this
808  message is the ultimate target. */
809 
810  /* Followed by encrypted, variable-size payload, which
811  must begin with a `struct TransportDVBoxPayloadP` */
812 
813  /* Followed by the actual message, which itself must not be a
814  a DV_LEARN or DV_BOX message! */
815 };
816 
817 
823 {
828 
833 
838 
844 };
845 
846 
852 {
857 
863 
868 };
869 
870 
876 {
881 
886 
892 
897 
902  struct GNUNET_TIME_AbsoluteNBO origin_time;
903 
908  struct GNUNET_TIME_RelativeNBO validity_duration;
909 };
910 
911 
921 {
926 
934  uint32_t seq GNUNET_PACKED;
935 
941 
948 
958 
968 };
969 
970 
972 
973 
978 {
982  CT_NONE = 0,
983 
987  CT_CORE = 1,
988 
993 
998 
1002  CT_APPLICATION = 4
1003 };
1004 
1005 
1011 {
1016 
1021 
1026 
1031 
1037  RMO_REDUNDANT = 4
1038 };
1039 
1040 
1045 {
1050 
1055 
1060 
1066 };
1067 
1068 
1074 {
1078  uint64_t bytes_sent;
1079 
1084  uint64_t bytes_received;
1085 };
1086 
1087 
1092 {
1097 
1103 
1108  unsigned int last_age;
1109 };
1110 
1111 
1115 struct TransportClient;
1116 
1120 struct Neighbour;
1121 
1126 struct DistanceVector;
1127 
1132 struct Queue;
1133 
1137 struct PendingMessage;
1138 
1142 struct DistanceVectorHop;
1143 
1152 struct VirtualLink;
1153 
1154 
1160 {
1166 
1172 
1177 
1181  struct GNUNET_TRANSPORT_IncomingMessage im;
1182 
1187  uint16_t total_hops;
1188 };
1189 
1190 
1195 {
1200 
1205 
1209  struct VirtualLink *vl;
1210 
1214  uint16_t size;
1215 
1222  uint16_t isize;
1223 };
1224 
1225 
1230 {
1235  struct MessageUUIDP msg_uuid;
1236 
1241 
1246 
1254  uint8_t *bitfield;
1255 
1260 
1266 
1270  uint16_t msg_size;
1271 
1276  uint16_t msg_missing;
1277 
1278  /* Followed by @e msg_size bytes of the (partially) defragmented original
1279  * message */
1280 
1281  /* Followed by @e bitfield data */
1282 };
1283 
1284 
1294 {
1298  struct GNUNET_PeerIdentity target;
1299 
1306 
1313 
1318 
1324 
1330 
1335 
1340 
1345 
1350 
1358 
1364 
1368  unsigned int fc_retransmit_count;
1369 
1374  unsigned int confirmed;
1375 
1379  struct Neighbour *n;
1380 
1385 
1392 
1399 
1408 
1414 
1420 
1429 
1437 
1444 
1453 
1466 
1472 
1479 
1490 
1495  uint32_t fc_seq_gen;
1496 
1502  uint32_t last_fc_seq;
1503 
1516 };
1517 
1518 
1523 {
1529 
1535 
1542 
1549 
1556 
1563 
1570 
1577 
1582 
1588 
1594 
1599  struct Queue *queue;
1600 
1605 
1609  uint16_t message_size;
1610 };
1611 
1612 
1617 {
1622 
1627 
1632 
1637 
1642 
1647 
1652 
1657 
1663  const struct GNUNET_PeerIdentity *path;
1664 
1670 
1679 
1683  struct PerformanceData pd;
1684 
1690  unsigned int distance;
1691 };
1692 
1693 
1699 {
1703  struct GNUNET_PeerIdentity target;
1704 
1709 
1714 
1719 
1724  struct VirtualLink *vl;
1725 
1731 
1736 
1741 
1746 
1751 };
1752 
1753 
1763 struct QueueEntry
1764 {
1768  struct QueueEntry *next;
1769 
1773  struct QueueEntry *prev;
1774 
1778  struct Queue *queue;
1779 
1784 
1788  uint64_t mid;
1789 };
1790 
1791 
1796 struct Queue
1797 {
1802 
1807 
1812 
1817 
1822 
1827 
1832 
1837 
1842 
1847 
1851  const char *address;
1852 
1856  unsigned int unlimited_length;
1857 
1863 
1872 
1876  struct PerformanceData pd;
1877 
1882  uint64_t mid_gen;
1883 
1887  uint32_t qid;
1888 
1892  uint32_t mtu;
1893 
1898 
1903 
1907  unsigned int queue_length;
1908 
1912  uint64_t q_capacity;
1913 
1917  uint32_t priority;
1918 
1922  enum GNUNET_NetworkType nt;
1923 
1928 
1933  int idle;
1934 };
1935 
1936 
1937 
1938 
1939 
1943 struct Neighbour
1944 {
1948  struct GNUNET_PeerIdentity pid;
1949 
1955 
1961 
1966 
1971 
1977 
1983 
1988  struct VirtualLink *vl;
1989 
1995 
2001 };
2002 
2003 
2009 {
2014 
2019 
2024 
2028  struct GNUNET_PeerIdentity pid;
2029 };
2030 
2031 
2035 struct PeerRequest
2036 {
2040  struct GNUNET_PeerIdentity pid;
2041 
2046 
2051 
2058 
2063 };
2064 
2065 
2070 {
2075 
2080 
2085 
2089  PMT_DV_BOX = 3
2090 };
2091 
2092 
2119 struct PendingMessage
2120 {
2125 
2130 
2135 
2140 
2146 
2152 
2157 
2162 
2168 
2172  struct VirtualLink *vl;
2173 
2182  struct QueueEntry *qe;
2183 
2188 
2193 
2198 
2203 
2208 
2213 
2218  struct MessageUUIDP msg_uuid;
2219 
2224  unsigned long long logging_uuid;
2225 
2229  enum PendingMessageType pmt;
2230 
2236 
2241 
2245  uint16_t bytes_msg;
2246 
2250  uint16_t frag_off;
2251 
2255  int16_t msg_uuid_set;
2256 
2257  /* Followed by @e bytes_msg to transmit */
2258 };
2259 
2260 
2265 {
2271 
2276 };
2277 
2278 
2284 {
2288  struct GNUNET_PeerIdentity target;
2289 
2294 
2301 
2306 
2312  uint32_t ack_counter;
2313 
2317  unsigned int num_acks;
2318 };
2319 
2320 
2325 {
2330 
2335 
2340 
2344  const char *address;
2345 
2350 
2355 
2361 
2365  uint32_t aid;
2366 
2370  enum GNUNET_NetworkType nt;
2371 };
2372 
2373 
2378 {
2383 
2388 
2393 
2398 
2402  enum ClientType type;
2403 
2404  union
2405  {
2409  struct
2410  {
2416 
2421  } core;
2422 
2426  struct
2427  {
2433  struct GNUNET_PeerIdentity peer;
2434 
2440 
2441 
2445  struct
2446  {
2452 
2457 
2462 
2468 
2474 
2480  unsigned int total_queue_length;
2481 
2487 
2491  struct
2492  {
2500 };
2501 
2502 
2508 {
2513  struct GNUNET_PeerIdentity pid;
2514 
2522 
2528 
2535  struct GNUNET_TIME_Absolute first_challenge_use;
2536 
2543  struct GNUNET_TIME_Absolute last_challenge_use;
2544 
2552  struct GNUNET_TIME_Absolute next_challenge;
2553 
2562  struct GNUNET_TIME_Relative challenge_backoff;
2563 
2568  struct GNUNET_TIME_Relative validation_rtt;
2569 
2577  struct GNUNET_CRYPTO_ChallengeNonceP challenge;
2578 
2582  char *address;
2583 
2589  struct GNUNET_CONTAINER_HeapNode *hn;
2590 
2596 
2602  uint32_t last_window_consum_limit;
2603 
2608  int awaiting_queue;
2609 };
2610 
2611 
2619 {
2623  struct GNUNET_PeerIdentity pid;
2624 
2629 
2634 
2639 
2645 
2650 
2656 
2662 
2667  size_t body_size;
2668 };
2669 
2670 
2675 
2680 
2685 
2690 
2694 static struct GNUNET_PeerIdentity GST_my_identity;
2695 
2700 
2706 
2712 
2718 
2724 
2730 
2736 
2742 
2747 
2751 static struct LearnLaunchEntry *lle_head = NULL;
2752 
2756 static struct LearnLaunchEntry *lle_tail = NULL;
2757 
2764 
2769 
2774 
2779 
2785 
2791 
2797 static struct IncomingRequest *ir_head;
2798 
2802 static struct IncomingRequest *ir_tail;
2803 
2807 static unsigned int ir_total;
2808 
2812 static unsigned long long logging_uuid_gen;
2813 
2818 static unsigned int pa_count;
2819 
2829 
2834 static int in_shutdown;
2835 
2846 static unsigned int
2848 {
2849  struct GNUNET_TIME_Absolute now;
2850 
2851  now = GNUNET_TIME_absolute_get ();
2852  return now.abs_value_us / GNUNET_TIME_UNIT_MINUTES.rel_value_us / 15;
2853 }
2854 
2855 
2861 static void
2863 {
2865  GNUNET_assert (ir_total > 0);
2866  ir_total--;
2868  ir->wc = NULL;
2869  GNUNET_free (ir);
2870 }
2871 
2872 
2878 static void
2880 {
2881  struct Queue *q = pa->queue;
2882  struct PendingMessage *pm = pa->pm;
2883  struct DistanceVectorHop *dvh = pa->dvh;
2884 
2886  "free_pending_acknowledgement\n");
2887  if (NULL != q)
2888  {
2889  GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
2890  pa->queue = NULL;
2891  }
2892  if (NULL != pm)
2893  {
2895  "remove pa from message\n");
2897  "remove pa from message %llu\n",
2898  pm->logging_uuid);
2900  "remove pa from message %u\n",
2901  pm->pmt);
2903  "remove pa from message %s\n",
2904  GNUNET_uuid2s (&pa->ack_uuid.value));
2905  GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
2906  pa->pm = NULL;
2907  }
2908  if (NULL != dvh)
2909  {
2910  GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
2911  pa->queue = NULL;
2912  }
2915  &pa->ack_uuid.value,
2916  pa));
2917  GNUNET_free (pa);
2918 }
2919 
2920 
2929 static void
2931 {
2932  struct PendingMessage *frag;
2933 
2934  while (NULL != (frag = root->head_frag))
2935  {
2936  struct PendingAcknowledgement *pa;
2937 
2938  free_fragment_tree (frag);
2939  while (NULL != (pa = frag->pa_head))
2940  {
2941  GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
2942  pa->pm = NULL;
2943  }
2944  GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
2945  if (NULL != frag->qe)
2946  {
2947  GNUNET_assert (frag == frag->qe->pm);
2948  frag->qe->pm = NULL;
2950  frag->qe->queue->queue_tail,
2951  frag->qe);
2953  "Removing QueueEntry MID %llu from queue\n",
2954  frag->qe->mid);
2955  GNUNET_free (frag->qe);
2956  }
2958  "Free frag %p\n",
2959  frag);
2960  GNUNET_free (frag);
2961  }
2962 }
2963 
2964 
2972 static void
2974 {
2975  struct TransportClient *tc = pm->client;
2976  struct VirtualLink *vl = pm->vl;
2977  struct PendingAcknowledgement *pa;
2978 
2980  "Freeing pm %p\n",
2981  pm);
2982  if (NULL != tc)
2983  {
2985  tc->details.core.pending_msg_head,
2986  tc->details.core.pending_msg_tail,
2987  pm);
2988  }
2989  if ((NULL != vl) && (NULL == pm->frag_parent))
2990  {
2992  "Removing pm %lu\n",
2993  pm->logging_uuid);
2995  vl->pending_msg_head,
2996  vl->pending_msg_tail,
2997  pm);
2998  }
2999  while (NULL != (pa = pm->pa_head))
3000  {
3001  if (NULL == pa)
3003  "free pending pa null\n");
3004  if (NULL == pm->pa_tail)
3006  "free pending pa_tail null\n");
3007  if (NULL == pa->prev_pa)
3009  "free pending pa prev null\n");
3010  if (NULL == pa->next_pa)
3012  "free pending pa next null\n");
3013  GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
3014  pa->pm = NULL;
3015  }
3016 
3018  if (NULL != pm->qe)
3019  {
3020  GNUNET_assert (pm == pm->qe->pm);
3021  pm->qe->pm = NULL;
3022  GNUNET_CONTAINER_DLL_remove (pm->qe->queue->queue_head,
3023  pm->qe->queue->queue_tail,
3024  pm->qe);
3026  "Removing QueueEntry MID %llu from queue\n",
3027  pm->qe->mid);
3028  GNUNET_free (pm->qe);
3029  }
3030  if (NULL != pm->bpm)
3031  {
3032  free_fragment_tree (pm->bpm);
3033  GNUNET_free (pm->bpm);
3034  }
3035  if (NULL == pm)
3037  "free pending pm null\n");
3038  GNUNET_free (pm);
3039 }
3040 
3041 
3047 static void
3049 {
3050  struct VirtualLink *vl = rc->virtual_link;
3051 
3055  rc->msg_uuid.uuid,
3056  rc));
3057  GNUNET_free (rc);
3058 }
3059 
3060 
3066 static void
3068 {
3069  struct VirtualLink *vl = cls;
3070  struct ReassemblyContext *rc;
3071 
3072  vl->reassembly_timeout_task = NULL;
3073  while (NULL != (rc = GNUNET_CONTAINER_heap_peek (vl->reassembly_heap)))
3074  {
3076  .rel_value_us)
3077  {
3079  continue;
3080  }
3081  GNUNET_assert (NULL == vl->reassembly_timeout_task);
3085  vl);
3086  return;
3087  }
3088 }
3089 
3090 
3099 static int
3100 free_reassembly_cb (void *cls, uint32_t key, void *value)
3101 {
3102  struct ReassemblyContext *rc = value;
3103 
3104  (void) cls;
3105  (void) key;
3107  return GNUNET_OK;
3108 }
3109 
3110 
3116 static void
3118 {
3119  struct PendingMessage *pm;
3120  struct CoreSentContext *csc;
3121 
3123  "free virtual link %p\n",
3124  vl);
3125 
3126  if (NULL != vl->reassembly_map)
3127  {
3130  NULL);
3132  vl->reassembly_map = NULL;
3134  vl->reassembly_heap = NULL;
3135  }
3136  if (NULL != vl->reassembly_timeout_task)
3137  {
3139  vl->reassembly_timeout_task = NULL;
3140  }
3141  while (NULL != (pm = vl->pending_msg_head))
3145  if (NULL != vl->visibility_task)
3146  {
3148  vl->visibility_task = NULL;
3149  }
3150  if (NULL != vl->fc_retransmit_task)
3151  {
3153  vl->fc_retransmit_task = NULL;
3154  }
3155  while (NULL != (csc = vl->csc_head))
3156  {
3158  GNUNET_assert (vl == csc->vl);
3159  csc->vl = NULL;
3160  }
3161  GNUNET_break (NULL == vl->n);
3162  GNUNET_break (NULL == vl->dv);
3163  GNUNET_free (vl);
3164 }
3165 
3166 
3172 static void
3174 {
3175  GNUNET_assert (
3176  GNUNET_YES ==
3179  vs->hn = NULL;
3180  if (NULL != vs->sc)
3181  {
3183  "store cancel\n");
3185  vs->sc = NULL;
3186  }
3187  GNUNET_free (vs->address);
3188  GNUNET_free (vs);
3189 }
3190 
3191 
3198 static struct Neighbour *
3200 {
3202 }
3203 
3204 
3211 static struct VirtualLink *
3213 {
3215 }
3216 
3217 
3222 {
3229 
3233  struct GNUNET_TIME_Relative rtt;
3234 
3239 
3244 
3249 };
3250 
3251 
3260 static void
3262 {
3263  struct Neighbour *n = dvh->next_hop;
3264  struct DistanceVector *dv = dvh->dv;
3265  struct PendingAcknowledgement *pa;
3266 
3267  while (NULL != (pa = dvh->pa_head))
3268  {
3270  pa->dvh = NULL;
3271  }
3272  GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh);
3274  GNUNET_free (dvh);
3275 }
3276 
3277 
3284 static void
3285 check_link_down (void *cls);
3286 
3287 
3293 static void
3295 {
3297  "Informing CORE clients about disconnect from %s\n",
3298  GNUNET_i2s (pid));
3299  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3300  {
3301  struct GNUNET_MQ_Envelope *env;
3302  struct DisconnectInfoMessage *dim;
3303 
3304  if (CT_CORE != tc->type)
3305  continue;
3307  dim->peer = *pid;
3308  GNUNET_MQ_send (tc->mq, env);
3309  }
3310 }
3311 
3312 
3319 static void
3321 {
3322  struct DistanceVectorHop *dvh;
3323 
3324  while (NULL != (dvh = dv->dv_head))
3326  if (NULL == dv->dv_head)
3327  {
3328  struct VirtualLink *vl;
3329 
3330  GNUNET_assert (
3331  GNUNET_YES ==
3333  if (NULL != (vl = dv->vl))
3334  {
3335  GNUNET_assert (dv == vl->dv);
3336  vl->dv = NULL;
3337  if (NULL == vl->n)
3338  {
3340  free_virtual_link (vl);
3341  }
3342  else
3343  {
3346  }
3347  dv->vl = NULL;
3348  }
3349 
3350  if (NULL != dv->timeout_task)
3351  {
3353  dv->timeout_task = NULL;
3354  }
3355  GNUNET_free (dv);
3356  }
3357 }
3358 
3359 
3373 static void
3375  const struct GNUNET_PeerIdentity *peer,
3376  const char *address,
3377  enum GNUNET_NetworkType nt,
3378  const struct MonitorEvent *me)
3379 {
3380  struct GNUNET_MQ_Envelope *env;
3381  struct GNUNET_TRANSPORT_MonitorData *md;
3382  size_t addr_len = strlen (address) + 1;
3383 
3384  env = GNUNET_MQ_msg_extra (md,
3385  addr_len,
3387  md->nt = htonl ((uint32_t) nt);
3388  md->peer = *peer;
3389  md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
3390  md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
3391  md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
3392  md->rtt = GNUNET_TIME_relative_hton (me->rtt);
3393  md->cs = htonl ((uint32_t) me->cs);
3394  md->num_msg_pending = htonl (me->num_msg_pending);
3395  md->num_bytes_pending = htonl (me->num_bytes_pending);
3396  memcpy (&md[1], address, addr_len);
3397  GNUNET_MQ_send (tc->mq, env);
3398 }
3399 
3400 
3410 static void
3412  const char *address,
3413  enum GNUNET_NetworkType nt,
3414  const struct MonitorEvent *me)
3415 {
3416  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3417  {
3418  if (CT_MONITOR != tc->type)
3419  continue;
3420  if (tc->details.monitor.one_shot)
3421  continue;
3422  if ((GNUNET_NO == GNUNET_is_zero (&tc->details.monitor.peer)) &&
3423  (0 != GNUNET_memcmp (&tc->details.monitor.peer, peer)))
3424  continue;
3426  }
3427 }
3428 
3429 
3439 static void *
3441  struct GNUNET_SERVICE_Client *client,
3442  struct GNUNET_MQ_Handle *mq)
3443 {
3444  struct TransportClient *tc;
3445 
3446  (void) cls;
3447  tc = GNUNET_new (struct TransportClient);
3448  tc->client = client;
3449  tc->mq = mq;
3452  "Client %p of type %u connected\n",
3453  tc,
3454  tc->type);
3455  return tc;
3456 }
3457 
3458 
3464 static void
3465 free_neighbour (struct Neighbour *neighbour)
3466 {
3467  struct DistanceVectorHop *dvh;
3468  struct VirtualLink *vl;
3469 
3470  GNUNET_assert (NULL == neighbour->queue_head);
3473  &neighbour->pid,
3474  neighbour));
3476  "Freeing neighbour\n");
3477  while (NULL != (dvh = neighbour->dv_head))
3478  {
3479  struct DistanceVector *dv = dvh->dv;
3480 
3482  if (NULL == dv->dv_head)
3483  free_dv_route (dv);
3484  }
3485  if (NULL != neighbour->get)
3486  {
3487  GNUNET_PEERSTORE_iterate_cancel (neighbour->get);
3488  neighbour->get = NULL;
3489  }
3490  if (NULL != neighbour->sc)
3491  {
3493  "store cancel\n");
3494  GNUNET_PEERSTORE_store_cancel (neighbour->sc);
3495  neighbour->sc = NULL;
3496  }
3497  if (NULL != (vl = neighbour->vl))
3498  {
3499  GNUNET_assert (neighbour == vl->n);
3500  vl->n = NULL;
3501  if (NULL == vl->dv)
3502  {
3505  }
3506  else
3507  {
3510  }
3511  neighbour->vl = NULL;
3512  }
3513  GNUNET_free (neighbour);
3514 }
3515 
3516 
3523 static void
3525  const struct GNUNET_PeerIdentity *pid)
3526 {
3527  struct GNUNET_MQ_Envelope *env;
3528  struct ConnectInfoMessage *cim;
3529 
3530  GNUNET_assert (CT_CORE == tc->type);
3532  cim->id = *pid;
3533  GNUNET_MQ_send (tc->mq, env);
3534 }
3535 
3536 
3542 static void
3544 {
3546  "Informing CORE clients about connection to %s\n",
3547  GNUNET_i2s (pid));
3548  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3549  {
3550  if (CT_CORE != tc->type)
3551  continue;
3553  }
3554 }
3555 
3556 
3564 static void
3565 transmit_on_queue (void *cls);
3566 
3567 
3571 static unsigned int
3573 {
3574  for (struct Queue *s = queue_head; NULL != s;
3575  s = s->next_client)
3576  {
3577  if (s->tc->details.communicator.address_prefix !=
3578  queue->tc->details.communicator.address_prefix)
3579  {
3581  "queue address %s qid %u compare with queue: address %s qid %u\n",
3582  queue->address,
3583  queue->qid,
3584  s->address,
3585  s->qid);
3586  if ((s->priority > queue->priority) && (0 < s->q_capacity) &&
3587  (QUEUE_LENGTH_LIMIT > s->queue_length) )
3588  return GNUNET_YES;
3590  "Lower prio\n");
3591  }
3592  }
3593  return GNUNET_NO;
3594 }
3595 
3596 
3604 static void
3606  struct Queue *queue,
3608 {
3610  queue->tc->details.communicator.
3611  queue_head))
3612  return;
3613 
3614  if (queue->tc->details.communicator.total_queue_length >=
3616  {
3618  "Transmission throttled due to communicator queue limit\n");
3620  GST_stats,
3621  "# Transmission throttled due to communicator queue limit",
3622  1,
3623  GNUNET_NO);
3624  queue->idle = GNUNET_NO;
3625  return;
3626  }
3627  if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
3628  {
3630  "Transmission throttled due to communicator queue length limit\n");
3632  "# Transmission throttled due to queue queue limit",
3633  1,
3634  GNUNET_NO);
3635  queue->idle = GNUNET_NO;
3636  return;
3637  }
3638  if (0 == queue->q_capacity)
3639  {
3641  "Transmission throttled due to communicator message queue qid %u has capacity %lu.\n",
3642  queue->qid,
3643  queue->q_capacity);
3645  "# Transmission throttled due to message queue capacity",
3646  1,
3647  GNUNET_NO);
3648  queue->idle = GNUNET_NO;
3649  return;
3650  }
3651  /* queue might indeed be ready, schedule it */
3652  if (NULL != queue->transmit_task)
3653  GNUNET_SCHEDULER_cancel (queue->transmit_task);
3654  queue->transmit_task =
3656  queue);
3658  "Considering transmission on queue `%s' QID %llu to %s\n",
3659  queue->address,
3660  (unsigned long long) queue->qid,
3661  GNUNET_i2s (&queue->neighbour->pid));
3662 }
3663 
3664 
3671 static void
3672 check_link_down (void *cls)
3673 {
3674  struct VirtualLink *vl = cls;
3675  struct DistanceVector *dv = vl->dv;
3676  struct Neighbour *n = vl->n;
3677  struct GNUNET_TIME_Absolute dvh_timeout;
3678  struct GNUNET_TIME_Absolute q_timeout;
3679 
3681  "Checking if link is down\n");
3682  vl->visibility_task = NULL;
3683  dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3684  if (NULL != dv)
3685  {
3686  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
3687  pos = pos->next_dv)
3688  dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout,
3689  pos->path_valid_until);
3690  if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
3691  {
3692  vl->dv->vl = NULL;
3693  vl->dv = NULL;
3694  }
3695  }
3696  q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3697  for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
3698  q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
3699  if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
3700  {
3701  vl->n->vl = NULL;
3702  vl->n = NULL;
3703  }
3704  if ((NULL == vl->n) && (NULL == vl->dv))
3705  {
3707  free_virtual_link (vl);
3708  return;
3709  }
3710  vl->visibility_task =
3711  GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout),
3712  &check_link_down,
3713  vl);
3714 }
3715 
3716 
3722 static void
3724 {
3725  struct Neighbour *neighbour = queue->neighbour;
3726  struct TransportClient *tc = queue->tc;
3727  struct MonitorEvent me = { .cs = GNUNET_TRANSPORT_CS_DOWN,
3729  struct QueueEntry *qe;
3730  int maxxed;
3731  struct PendingAcknowledgement *pa;
3732  struct VirtualLink *vl;
3733 
3735  "Cleaning up queue %u\n", queue->qid);
3736  if (NULL != queue->transmit_task)
3737  {
3738  GNUNET_SCHEDULER_cancel (queue->transmit_task);
3739  queue->transmit_task = NULL;
3740  }
3741  while (NULL != (pa = queue->pa_head))
3742  {
3743  GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
3744  pa->queue = NULL;
3745  }
3746 
3747  GNUNET_CONTAINER_MDLL_remove (neighbour,
3748  neighbour->queue_head,
3749  neighbour->queue_tail,
3750  queue);
3752  tc->details.communicator.queue_head,
3753  tc->details.communicator.queue_tail,
3754  queue);
3755  maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT <=
3756  tc->details.communicator.
3757  total_queue_length);
3758  while (NULL != (qe = queue->queue_head))
3759  {
3760  GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe);
3761  queue->queue_length--;
3762  tc->details.communicator.total_queue_length--;
3763  if (NULL != qe->pm)
3764  {
3765  GNUNET_assert (qe == qe->pm->qe);
3766  qe->pm->qe = NULL;
3767  }
3768  GNUNET_free (qe);
3769  }
3770  GNUNET_assert (0 == queue->queue_length);
3771  if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT >
3772  tc->details.communicator.total_queue_length))
3773  {
3774  /* Communicator dropped below threshold, resume all _other_ queues */
3776  GST_stats,
3777  "# Transmission throttled due to communicator queue limit",
3778  -1,
3779  GNUNET_NO);
3780  for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
3781  s = s->next_client)
3783  s,
3785  }
3786  notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
3787  GNUNET_free (queue);
3788 
3789  vl = lookup_virtual_link (&neighbour->pid);
3790  if ((NULL != vl) && (GNUNET_YES == vl->confirmed) && (neighbour == vl->n))
3791  {
3793  check_link_down (vl);
3794  }
3795  if (NULL == neighbour->queue_head)
3796  {
3797  free_neighbour (neighbour);
3798  }
3799 }
3800 
3801 
3807 static void
3809 {
3810  struct TransportClient *tc = ale->tc;
3811 
3812  GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
3813  tc->details.communicator.addr_tail,
3814  ale);
3815  if (NULL != ale->sc)
3816  {
3818  "store cancel\n");
3820  ale->sc = NULL;
3821  }
3822  if (NULL != ale->st)
3823  {
3824  GNUNET_SCHEDULER_cancel (ale->st);
3825  ale->st = NULL;
3826  }
3827  GNUNET_free (ale);
3828 }
3829 
3830 
3839 static int
3841  const struct GNUNET_PeerIdentity *pid,
3842  void *value)
3843 {
3844  struct TransportClient *tc = cls;
3845  struct PeerRequest *pr = value;
3846 
3848  pr->wc = NULL;
3849  GNUNET_assert (
3850  GNUNET_YES ==
3851  GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
3852  pid,
3853  pr));
3854  GNUNET_free (pr);
3855 
3856  return GNUNET_OK;
3857 }
3858 
3859 
3860 static void
3861 do_shutdown (void *cls);
3862 
3871 static void
3873  struct GNUNET_SERVICE_Client *client,
3874  void *app_ctx)
3875 {
3876  struct TransportClient *tc = app_ctx;
3877 
3878  (void) cls;
3879  (void) client;
3881  switch (tc->type)
3882  {
3883  case CT_NONE:
3885  "Unknown Client %p disconnected, cleaning up.\n",
3886  tc);
3887  break;
3888 
3889  case CT_CORE: {
3891  "CORE Client %p disconnected, cleaning up.\n",
3892  tc);
3893 
3894  struct PendingMessage *pm;
3895 
3896  while (NULL != (pm = tc->details.core.pending_msg_head))
3897  {
3899  tc->details.core.pending_msg_head,
3900  tc->details.core.pending_msg_tail,
3901  pm);
3902  pm->client = NULL;
3903  }
3904  }
3905  break;
3906 
3907  case CT_MONITOR:
3909  "MONITOR Client %p disconnected, cleaning up.\n",
3910  tc);
3911 
3912  break;
3913 
3914  case CT_COMMUNICATOR: {
3916  "COMMUNICATOR Client %p disconnected, cleaning up.\n",
3917  tc);
3918 
3919  struct Queue *q;
3920  struct AddressListEntry *ale;
3921 
3922  while (NULL != (q = tc->details.communicator.queue_head))
3923  free_queue (q);
3924  while (NULL != (ale = tc->details.communicator.addr_head))
3926  GNUNET_free (tc->details.communicator.address_prefix);
3927  }
3928  break;
3929 
3930  case CT_APPLICATION:
3932  "APPLICATION Client %p disconnected, cleaning up.\n",
3933  tc);
3934 
3935  GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
3937  tc);
3938  GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests);
3939  break;
3940  }
3941  GNUNET_free (tc);
3942  if ((GNUNET_YES == in_shutdown) && (NULL == clients_head))
3943  {
3945  "Our last client disconnected\n");
3946  do_shutdown (cls);
3947  }
3948 }
3949 
3950 
3960 static int
3962  const struct GNUNET_PeerIdentity *pid,
3963  void *value)
3964 {
3965  struct TransportClient *tc = cls;
3966 
3967  (void) value;
3969  "Telling new CORE client about existing connection to %s\n",
3970  GNUNET_i2s (pid));
3972  return GNUNET_OK;
3973 }
3974 
3975 
3984 static void
3985 handle_client_start (void *cls, const struct StartMessage *start)
3986 {
3987  struct TransportClient *tc = cls;
3988  uint32_t options;
3989 
3990  options = ntohl (start->options);
3991  if ((0 != (1 & options)) &&
3992  (0 != GNUNET_memcmp (&start->self, &GST_my_identity)))
3993  {
3994  /* client thinks this is a different peer, reject */
3995  GNUNET_break (0);
3996  GNUNET_SERVICE_client_drop (tc->client);
3997  return;
3998  }
3999  if (CT_NONE != tc->type)
4000  {
4001  GNUNET_break (0);
4002  GNUNET_SERVICE_client_drop (tc->client);
4003  return;
4004  }
4005  tc->type = CT_CORE;
4007  "New CORE client with PID %s registered\n",
4008  GNUNET_i2s (&start->self));
4011  tc);
4013 }
4014 
4015 
4022 static int
4023 check_client_send (void *cls, const struct OutboundMessage *obm)
4024 {
4025  struct TransportClient *tc = cls;
4026  uint16_t size;
4027  const struct GNUNET_MessageHeader *obmm;
4028 
4029  if (CT_CORE != tc->type)
4030  {
4031  GNUNET_break (0);
4032  return GNUNET_SYSERR;
4033  }
4034  size = ntohs (obm->header.size) - sizeof(struct OutboundMessage);
4035  if (size < sizeof(struct GNUNET_MessageHeader))
4036  {
4037  GNUNET_break (0);
4038  return GNUNET_SYSERR;
4039  }
4040  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
4041  if (size != ntohs (obmm->size))
4042  {
4043  GNUNET_break (0);
4044  return GNUNET_SYSERR;
4045  }
4046  return GNUNET_OK;
4047 }
4048 
4049 
4057 static void
4059 {
4060  struct TransportClient *tc = pm->client;
4061  struct VirtualLink *vl = pm->vl;
4062 
4064  "client send response\n");
4065  if (NULL != tc)
4066  {
4067  struct GNUNET_MQ_Envelope *env;
4068  struct SendOkMessage *so_msg;
4069 
4071  so_msg->peer = vl->target;
4073  "Confirming transmission of <%llu> to %s\n",
4074  pm->logging_uuid,
4075  GNUNET_i2s (&vl->target));
4076  GNUNET_MQ_send (tc->mq, env);
4077  }
4079 }
4080 
4081 
4091 static unsigned int
4094  struct DistanceVectorHop **hops_array,
4095  unsigned int hops_array_length)
4096 {
4097  uint64_t choices[hops_array_length];
4098  uint64_t num_dv;
4099  unsigned int dv_count;
4100 
4101  /* Pick random vectors, but weighted by distance, giving more weight
4102  to shorter vectors */
4103  num_dv = 0;
4104  dv_count = 0;
4105  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4106  pos = pos->next_dv)
4107  {
4108  if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
4109  (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
4110  .rel_value_us == 0))
4111  continue; /* pos unconfirmed and confirmed required */
4112  num_dv += MAX_DV_HOPS_ALLOWED - pos->distance;
4113  dv_count++;
4114  }
4115  if (0 == dv_count)
4116  return 0;
4117  if (dv_count <= hops_array_length)
4118  {
4119  dv_count = 0;
4120  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4121  pos = pos->next_dv)
4122  hops_array[dv_count++] = pos;
4123  return dv_count;
4124  }
4125  for (unsigned int i = 0; i < hops_array_length; i++)
4126  {
4127  int ok = GNUNET_NO;
4128  while (GNUNET_NO == ok)
4129  {
4130  choices[i] =
4132  ok = GNUNET_YES;
4133  for (unsigned int j = 0; j < i; j++)
4134  if (choices[i] == choices[j])
4135  {
4136  ok = GNUNET_NO;
4137  break;
4138  }
4139  }
4140  }
4141  dv_count = 0;
4142  num_dv = 0;
4143  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4144  pos = pos->next_dv)
4145  {
4146  uint32_t delta = MAX_DV_HOPS_ALLOWED - pos->distance;
4147 
4148  if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
4149  (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
4150  .rel_value_us == 0))
4151  continue; /* pos unconfirmed and confirmed required */
4152  for (unsigned int i = 0; i < hops_array_length; i++)
4153  if ((num_dv <= choices[i]) && (num_dv + delta > choices[i]))
4154  hops_array[dv_count++] = pos;
4155  num_dv += delta;
4156  }
4157  return dv_count;
4158 }
4159 
4160 
4167 static int
4169  void *cls,
4170  const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
4171 {
4172  struct TransportClient *tc = cls;
4173  uint16_t size;
4174 
4175  if (CT_NONE != tc->type)
4176  {
4177  GNUNET_break (0);
4178  return GNUNET_SYSERR;
4179  }
4180  tc->type = CT_COMMUNICATOR;
4181  size = ntohs (cam->header.size) - sizeof(*cam);
4182  if (0 == size)
4183  return GNUNET_OK; /* receive-only communicator */
4185  return GNUNET_OK;
4186 }
4187 
4188 
4194 static void
4196 {
4197  if (0 != ntohl (cmc->im.fc_on))
4198  {
4199  /* send ACK when done to communicator for flow control! */
4200  struct GNUNET_MQ_Envelope *env;
4201  struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
4202 
4204  ack->reserved = htonl (0);
4205  ack->fc_id = cmc->im.fc_id;
4206  ack->sender = cmc->im.sender;
4207  GNUNET_MQ_send (cmc->tc->mq, env);
4208  }
4210  GNUNET_free (cmc);
4211 }
4212 
4213 
4223 static void
4224 handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
4225 {
4226  struct TransportClient *tc = cls;
4227  struct VirtualLink *vl;
4228  uint32_t delta;
4229  struct CommunicatorMessageContext *cmc;
4230 
4231  if (CT_CORE != tc->type)
4232  {
4233  GNUNET_break (0);
4234  GNUNET_SERVICE_client_drop (tc->client);
4235  return;
4236  }
4237  vl = lookup_virtual_link (&rom->peer);
4238  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
4239  {
4241  "# RECV_OK dropped: virtual link unknown",
4242  1,
4243  GNUNET_NO);
4245  return;
4246  }
4247  delta = ntohl (rom->increase_window_delta);
4248  vl->core_recv_window += delta;
4249  if (vl->core_recv_window <= 0)
4250  return;
4251  /* resume communicators */
4252  while (NULL != (cmc = vl->cmc_tail))
4253  {
4255  finish_cmc_handling (cmc);
4256  }
4257 }
4258 
4259 
4266 static void
4268  void *cls,
4269  const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
4270 {
4271  struct TransportClient *tc = cls;
4272  uint16_t size;
4273 
4274  size = ntohs (cam->header.size) - sizeof(*cam);
4275  if (0 == size)
4276  {
4278  "Receive-only communicator connected\n");
4279  return; /* receive-only communicator */
4280  }
4281  tc->details.communicator.address_prefix =
4282  GNUNET_strdup ((const char *) &cam[1]);
4283  tc->details.communicator.cc =
4284  (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
4286  "Communicator with prefix `%s' connected\n",
4287  tc->details.communicator.address_prefix);
4289 }
4290 
4291 
4299 static int
4301  void *cls,
4302  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
4303 {
4304  const struct GNUNET_MessageHeader *inbox;
4305  const char *is;
4306  uint16_t msize;
4307  uint16_t isize;
4308 
4309  (void) cls;
4310  msize = ntohs (cb->header.size) - sizeof(*cb);
4311  inbox = (const struct GNUNET_MessageHeader *) &cb[1];
4312  isize = ntohs (inbox->size);
4313  if (isize >= msize)
4314  {
4315  GNUNET_break (0);
4316  return GNUNET_SYSERR;
4317  }
4318  is = (const char *) inbox;
4319  is += isize;
4320  msize -= isize;
4321  GNUNET_assert (0 < msize);
4322  if ('\0' != is[msize - 1])
4323  {
4324  GNUNET_break (0);
4325  return GNUNET_SYSERR;
4326  }
4327  return GNUNET_OK;
4328 }
4329 
4330 
4337 static void
4339 {
4340  struct EphemeralConfirmationPS ec;
4341 
4342  if (0 !=
4344  return;
4346  dv->ephemeral_validity =
4351  ec.target = dv->target;
4352  ec.ephemeral_key = dv->ephemeral_key;
4354  ec.purpose.size = htonl (sizeof(ec));
4356  &ec,
4357  &dv->sender_sig);
4358 }
4359 
4360 
4370 static void
4372  struct PendingMessage *pm,
4373  const void *payload,
4374  size_t payload_size)
4375 {
4376  struct Neighbour *n = queue->neighbour;
4377  struct GNUNET_TRANSPORT_SendMessageTo *smt;
4378  struct GNUNET_MQ_Envelope *env;
4379 
4380  GNUNET_log (
4382  "Queueing %u bytes of payload for transmission <%llu> on queue %llu to %s\n",
4383  (unsigned int) payload_size,
4384  (NULL == pm) ? 0 : pm->logging_uuid,
4385  (unsigned long long) queue->qid,
4386  GNUNET_i2s (&queue->neighbour->pid));
4387  env = GNUNET_MQ_msg_extra (smt,
4388  payload_size,
4390  smt->qid = queue->qid;
4391  smt->mid = queue->mid_gen;
4392  smt->receiver = n->pid;
4393  memcpy (&smt[1], payload, payload_size);
4394  {
4395  /* Pass the env to the communicator of queue for transmission. */
4396  struct QueueEntry *qe;
4397 
4398  qe = GNUNET_new (struct QueueEntry);
4399  qe->mid = queue->mid_gen++;
4400  qe->queue = queue;
4401  if (NULL != pm)
4402  {
4403  qe->pm = pm;
4404  // TODO Why do we have a retransmission. When we know, make decision if we still want this.
4405  // GNUNET_assert (NULL == pm->qe);
4406  /*if (NULL != pm->qe)
4407  {
4408  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4409  "Retransmitting message <%llu> remove pm from qe with MID: %llu \n",
4410  pm->logging_uuid,
4411  (unsigned long long) pm->qe->mid);
4412  pm->qe->pm = NULL;
4413  }*/
4414  pm->qe = qe;
4415  }
4416  GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe);
4417  GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
4418  queue->queue_length++;
4419  queue->tc->details.communicator.total_queue_length++;
4420  if (0 == queue->q_capacity)
4421  return;
4422  if (GNUNET_NO == queue->unlimited_length)
4423  queue->q_capacity--;
4425  "Queue %s with qid %u has capacity %lu\n",
4426  queue->address,
4427  queue->qid,
4428  queue->q_capacity);
4430  queue->tc->details.communicator.total_queue_length)
4431  queue->idle = GNUNET_NO;
4432  if (QUEUE_LENGTH_LIMIT == queue->queue_length)
4433  queue->idle = GNUNET_NO;
4434  if (0 == queue->q_capacity)
4435  queue->idle = GNUNET_NO;
4437  "Sending message MID %llu of type %u (%u) and size %u with MQ %p\n",
4438  smt->mid,
4439  ntohs (((const struct GNUNET_MessageHeader *) payload)->type),
4440  ntohs (smt->header.size),
4441  payload_size,
4442  queue->tc->mq);
4443  GNUNET_MQ_send (queue->tc->mq, env);
4444  }
4445 }
4446 
4447 
4458 static struct GNUNET_TIME_Relative
4459 route_via_neighbour (const struct Neighbour *n,
4460  const struct GNUNET_MessageHeader *hdr,
4462 {
4463  struct GNUNET_TIME_Absolute now;
4464  unsigned int candidates;
4465  unsigned int sel1;
4466  unsigned int sel2;
4467  struct GNUNET_TIME_Relative rtt;
4468 
4469  /* Pick one or two 'random' queues from n (under constraints of options) */
4470  now = GNUNET_TIME_absolute_get ();
4471  /* FIXME-OPTIMIZE: give queues 'weights' and pick proportional to
4472  weight in the future; weight could be assigned by observed
4473  bandwidth (note: not sure if we should do this for this type
4474  of control traffic though). */
4475  candidates = 0;
4476  for (struct Queue *pos = n->queue_head; NULL != pos;
4477  pos = pos->next_neighbour)
4478  {
4479  if ((0 != (options & RMO_UNCONFIRMED_ALLOWED)) ||
4480  (pos->validated_until.abs_value_us > now.abs_value_us))
4481  candidates++;
4482  }
4483  if (0 == candidates)
4484  {
4485  /* This can happen rarely if the last confirmed queue timed
4486  out just as we were beginning to process this message. */
4488  "Could not route message of type %u to %s: no valid queue\n",
4489  ntohs (hdr->type),
4490  GNUNET_i2s (&n->pid));
4492  "# route selection failed (all no valid queue)",
4493  1,
4494  GNUNET_NO);
4496  }
4497 
4500  if (0 == (options & RMO_REDUNDANT))
4501  sel2 = candidates; /* picks none! */
4502  else
4504  candidates = 0;
4505  for (struct Queue *pos = n->queue_head; NULL != pos;
4506  pos = pos->next_neighbour)
4507  {
4508  if ((0 != (options & RMO_UNCONFIRMED_ALLOWED)) ||
4509  (pos->validated_until.abs_value_us > now.abs_value_us))
4510  {
4511  if ((sel1 == candidates) || (sel2 == candidates))
4512  {
4514  "Routing message of type %u to %s using %s (#%u)\n",
4515  ntohs (hdr->type),
4516  GNUNET_i2s (&n->pid),
4517  pos->address,
4518  (sel1 == candidates) ? 1 : 2);
4519  rtt = GNUNET_TIME_relative_min (rtt, pos->pd.aged_rtt);
4520  queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
4521  }
4522  candidates++;
4523  }
4524  }
4525  return rtt;
4526 }
4527 
4528 
4533 {
4537  gcry_cipher_hd_t cipher;
4538 
4542  struct
4543  {
4548 
4552  char aes_key[256 / 8];
4553 
4557  char aes_ctr[128 / 8];
4559 };
4560 
4561 
4570 static void
4572  const struct GNUNET_ShortHashCode *iv,
4573  struct DVKeyState *key)
4574 {
4575  /* must match #dh_key_derive_eph_pub */
4577  GNUNET_CRYPTO_kdf (&key->material,
4578  sizeof(key->material),
4579  "transport-backchannel-key",
4580  strlen ("transport-backchannel-key"),
4581  km,
4582  sizeof(*km),
4583  iv,
4584  sizeof(*iv),
4585  NULL));
4587  "Deriving backchannel key based on KM %s and IV %s\n",
4588  GNUNET_h2s (km),
4589  GNUNET_sh2s (iv));
4590  GNUNET_assert (0 == gcry_cipher_open (&key->cipher,
4591  GCRY_CIPHER_AES256 /* low level: go for speed */,
4592  GCRY_CIPHER_MODE_CTR,
4593  0 /* flags */));
4594  GNUNET_assert (0 == gcry_cipher_setkey (key->cipher,
4595  &key->material.aes_key,
4596  sizeof(key->material.aes_key)));
4597  gcry_cipher_setctr (key->cipher,
4598  &key->material.aes_ctr,
4599  sizeof(key->material.aes_ctr));
4600 }
4601 
4602 
4612 static void
4614  const struct GNUNET_CRYPTO_EcdhePrivateKey *priv_ephemeral,
4615  const struct GNUNET_PeerIdentity *target,
4616  const struct GNUNET_ShortHashCode *iv,
4617  struct DVKeyState *key)
4618 {
4619  struct GNUNET_HashCode km;
4620 
4622  &target->public_key,
4623  &km));
4624  dv_setup_key_state_from_km (&km, iv, key);
4625 }
4626 
4627 
4637 static void
4639  const struct GNUNET_ShortHashCode *iv,
4640  struct DVKeyState *key)
4641 {
4642  struct GNUNET_HashCode km;
4643 
4645  pub_ephemeral,
4646  &km));
4647  dv_setup_key_state_from_km (&km, iv, key);
4648 }
4649 
4650 
4660 static void
4661 dv_hmac (const struct DVKeyState *key,
4662  struct GNUNET_HashCode *hmac,
4663  const void *data,
4664  size_t data_size)
4665 {
4666  GNUNET_CRYPTO_hmac (&key->material.hmac_key, data, data_size, hmac);
4667 }
4668 
4669 
4679 static void
4680 dv_encrypt (struct DVKeyState *key, const void *in, void *dst, size_t in_size)
4681 {
4682  GNUNET_assert (0 ==
4683  gcry_cipher_encrypt (key->cipher, dst, in_size, in, in_size));
4684 }
4685 
4686 
4696 static void
4698  void *out,
4699  const void *ciph,
4700  size_t out_size)
4701 {
4702  GNUNET_assert (
4703  0 == gcry_cipher_decrypt (key->cipher, out, out_size, ciph, out_size));
4704 }
4705 
4706 
4712 static void
4714 {
4715  gcry_cipher_close (key->cipher);
4716  GNUNET_CRYPTO_zero_keys (&key->material, sizeof(key->material));
4717 }
4718 
4719 
4730 typedef void (*DVMessageHandler) (void *cls,
4731  struct Neighbour *next_hop,
4732  const struct GNUNET_MessageHeader *hdr,
4734 
4749 static struct GNUNET_TIME_Relative
4751  unsigned int num_dvhs,
4752  struct DistanceVectorHop **dvhs,
4753  const struct GNUNET_MessageHeader *hdr,
4754  DVMessageHandler use,
4755  void *use_cls,
4757  enum GNUNET_GenericReturnValue without_fc)
4758 {
4759  struct TransportDVBoxMessage box_hdr;
4760  struct TransportDVBoxPayloadP payload_hdr;
4761  uint16_t enc_body_size = ntohs (hdr->size);
4762  char enc[sizeof(struct TransportDVBoxPayloadP) + enc_body_size] GNUNET_ALIGN;
4763  struct TransportDVBoxPayloadP *enc_payload_hdr =
4764  (struct TransportDVBoxPayloadP *) enc;
4765  struct DVKeyState *key;
4766  struct GNUNET_TIME_Relative rtt;
4767 
4768  key = GNUNET_new (struct DVKeyState);
4769  /* Encrypt payload */
4771  box_hdr.total_hops = htons (0);
4772  box_hdr.without_fc = htons (without_fc);
4773  update_ephemeral (dv);
4774  box_hdr.ephemeral_key = dv->ephemeral_key;
4775  payload_hdr.sender_sig = dv->sender_sig;
4776 
4778  &box_hdr.iv,
4779  sizeof(box_hdr.iv));
4780  dh_key_derive_eph_pid (&dv->private_key, &dv->target, &box_hdr.iv, key);
4781  payload_hdr.sender = GST_my_identity;
4782  payload_hdr.monotonic_time = GNUNET_TIME_absolute_hton (dv->monotime);
4783  dv_encrypt (key, &payload_hdr, enc_payload_hdr, sizeof(payload_hdr));
4784  dv_encrypt (key,
4785  hdr,
4786  &enc[sizeof(struct TransportDVBoxPayloadP)],
4787  enc_body_size);
4788  dv_hmac (key, &box_hdr.hmac, enc, sizeof(enc));
4789  dv_key_clean (key);
4791  /* For each selected path, take the pre-computed header and body
4792  and add the path in the middle of the message; then send it. */
4793  for (unsigned int i = 0; i < num_dvhs; i++)
4794  {
4795  struct DistanceVectorHop *dvh = dvhs[i];
4796  unsigned int num_hops = dvh->distance + 1;
4797  char buf[sizeof(struct TransportDVBoxMessage)
4798  + sizeof(struct GNUNET_PeerIdentity) * num_hops
4799  + sizeof(struct TransportDVBoxPayloadP)
4800  + enc_body_size] GNUNET_ALIGN;
4801  struct GNUNET_PeerIdentity *dhops;
4802 
4803  box_hdr.header.size = htons (sizeof(buf));
4804  box_hdr.orig_size = htons (sizeof(buf));
4805  box_hdr.num_hops = htons (num_hops);
4806  memcpy (buf, &box_hdr, sizeof(box_hdr));
4807  dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof(box_hdr)];
4808  memcpy (dhops,
4809  dvh->path,
4810  dvh->distance * sizeof(struct GNUNET_PeerIdentity));
4811  dhops[dvh->distance] = dv->target;
4812  if (GNUNET_EXTRA_LOGGING > 0)
4813  {
4814  char *path;
4815 
4817  for (unsigned int j = 0; j < num_hops; j++)
4818  {
4819  char *tmp;
4820 
4821  GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[j]));
4822  GNUNET_free (path);
4823  path = tmp;
4824  }
4826  "Routing message of type %u to %s using DV (#%u/%u) via %s\n",
4827  ntohs (hdr->type),
4828  GNUNET_i2s (&dv->target),
4829  i + 1,
4830  num_dvhs,
4831  path);
4832  GNUNET_free (path);
4833  }
4834  rtt = GNUNET_TIME_relative_min (rtt, dvh->pd.aged_rtt);
4835  memcpy (&dhops[num_hops], enc, sizeof(enc));
4836  use (use_cls,
4837  dvh->next_hop,
4838  (const struct GNUNET_MessageHeader *) buf,
4839  options);
4840  GNUNET_free (key);
4841  }
4842  return rtt;
4843 }
4844 
4845 
4855 static void
4857  struct Neighbour *next_hop,
4858  const struct GNUNET_MessageHeader *hdr,
4860 {
4861  (void) cls;
4862  (void) route_via_neighbour (next_hop, hdr, RMO_UNCONFIRMED_ALLOWED);
4863 }
4864 
4865 
4877 static struct GNUNET_TIME_Relative
4879 // route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
4880  const struct GNUNET_MessageHeader *hdr,
4882 {
4883  // struct VirtualLink *vl;
4884  struct Neighbour *n;
4885  struct DistanceVector *dv;
4886  struct GNUNET_TIME_Relative rtt1;
4887  struct GNUNET_TIME_Relative rtt2;
4888  const struct GNUNET_PeerIdentity *target = &vl->target;
4889 
4891  "Trying to route message of type %u to %s without fc\n",
4892  ntohs (hdr->type),
4893  GNUNET_i2s (target));
4894 
4895  // TODO Do this elsewhere. vl should be given as parameter to method.
4896  // vl = lookup_virtual_link (target);
4897  GNUNET_assert (NULL != vl && GNUNET_YES == vl->confirmed);
4898  if (NULL == vl)
4900  n = vl->n;
4901  dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
4902  if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
4903  {
4904  /* if confirmed is required, and we do not have anything
4905  confirmed, drop respective options */
4906  if (NULL == n)
4907  n = lookup_neighbour (target);
4908  if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
4910  }
4911  if ((NULL == n) && (NULL == dv))
4912  {
4914  "Cannot route message of type %u to %s: no route\n",
4915  ntohs (hdr->type),
4916  GNUNET_i2s (target));
4918  "# Messages dropped in routing: no acceptable method",
4919  1,
4920  GNUNET_NO);
4922  }
4924  "Routing message of type %u to %s with options %X\n",
4925  ntohs (hdr->type),
4926  GNUNET_i2s (target),
4927  (unsigned int) options);
4928  /* If both dv and n are possible and we must choose:
4929  flip a coin for the choice between the two; for now 50/50 */
4930  if ((NULL != n) && (NULL != dv) && (0 == (options & RMO_REDUNDANT)))
4931  {
4933  n = NULL;
4934  else
4935  dv = NULL;
4936  }
4937  if ((NULL != n) && (NULL != dv))
4938  options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's
4939  enough for redundancy, so clear the flag. */
4942  if (NULL != n)
4943  {
4945  "Try to route message of type %u to %s without fc via neighbour\n",
4946  ntohs (hdr->type),
4947  GNUNET_i2s (target));
4948  rtt1 = route_via_neighbour (n, hdr, options);
4949  }
4950  if (NULL != dv)
4951  {
4952  struct DistanceVectorHop *hops[2];
4953  unsigned int res;
4954 
4956  options,
4957  hops,
4958  (0 == (options & RMO_REDUNDANT)) ? 1 : 2);
4959  if (0 == res)
4960  {
4962  "Failed to route message, could not determine DV path\n");
4963  return rtt1;
4964  }
4966  "encapsulate_for_dv 1\n");
4967  rtt2 = encapsulate_for_dv (dv,
4968  res,
4969  hops,
4970  hdr,
4972  NULL,
4973  options & (~RMO_REDUNDANT),
4974  GNUNET_YES);
4975  }
4976  return GNUNET_TIME_relative_min (rtt1, rtt2);
4977 }
4978 
4979 
4980 static void
4981 consider_sending_fc (void *cls);
4982 
4989 static void
4991 {
4992  struct VirtualLink *vl = cls;
4993  vl->fc_retransmit_task = NULL;
4994  consider_sending_fc (cls);
4995 }
4996 
4997 
5004 static void
5006 {
5007  struct VirtualLink *vl = cls;
5008  struct GNUNET_TIME_Absolute monotime;
5009  struct TransportFlowControlMessage fc;
5011  struct GNUNET_TIME_Relative rtt;
5012 
5014  /* OPTIMIZE-FC-BDP: decide sane criteria on when to do this, instead of doing
5015  it always! */
5016  /* For example, we should probably ONLY do this if a bit more than
5017  an RTT has passed, or if the window changed "significantly" since
5018  then. See vl->last_fc_rtt! NOTE: to do this properly, we also
5019  need an estimate for the bandwidth-delay-product for the entire
5020  VL, as that determines "significantly". We have the delay, but
5021  the bandwidth statistics need to be added for the VL!*/(void) duration;
5022 
5024  "Sending FC seq %u to %s with new window %llu\n",
5025  (unsigned int) vl->fc_seq_gen,
5026  GNUNET_i2s (&vl->target),
5027  (unsigned long long) vl->incoming_fc_window_size);
5029  vl->last_fc_transmission = monotime;
5031  fc.header.size = htons (sizeof(fc));
5032  fc.seq = htonl (vl->fc_seq_gen++);
5036  fc.sender_time = GNUNET_TIME_absolute_hton (monotime);
5038  if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us)
5039  {
5042  "FC retransmission to %s failed, will retry in %s\n",
5043  GNUNET_i2s (&vl->target),
5046  }
5047  else
5048  {
5049  /* OPTIMIZE-FC-BDP: rtt is not ideal, we can do better! */
5050  vl->last_fc_rtt = rtt;
5051  }
5052  if (NULL != vl->fc_retransmit_task)
5055  {
5057  vl->fc_retransmit_count = 0;
5058  }
5059  vl->fc_retransmit_task =
5061  vl->fc_retransmit_count++;
5062 }
5063 
5064 
5081 static void
5083 {
5084  struct Neighbour *n = vl->n;
5085  struct DistanceVector *dv = vl->dv;
5086  struct GNUNET_TIME_Absolute now;
5087  struct VirtualLink *vl_next_hop;
5088  int elig;
5089 
5091  "check_vl_transmission to target %s\n",
5092  GNUNET_i2s (&vl->target));
5093  /* Check that we have an eligible pending message!
5094  (cheaper than having #transmit_on_queue() find out!) */
5095  elig = GNUNET_NO;
5096  for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
5097  pm = pm->next_vl)
5098  {
5100  "check_vl_transmission loop\n");
5101  if (NULL != pm->qe)
5102  continue; /* not eligible, is in a queue! */
5103  if (pm->bytes_msg + vl->outbound_fc_window_size_used >
5105  {
5107  "Stalled message %lu transmission on VL %s due to flow control: %llu < %llu\n",
5108  pm->logging_uuid,
5109  GNUNET_i2s (&vl->target),
5110  (unsigned long long) vl->outbound_fc_window_size,
5111  (unsigned long long) (pm->bytes_msg
5113  consider_sending_fc (vl);
5114  return; /* We have a message, but flow control says "nope" */
5115  }
5117  "Target window on VL %s not stalled. Scheduling transmission on queue\n",
5118  GNUNET_i2s (&vl->target));
5119  /* Notify queues at direct neighbours that we are interested */
5120  now = GNUNET_TIME_absolute_get ();
5121  if (NULL != n)
5122  {
5123  for (struct Queue *queue = n->queue_head; NULL != queue;
5124  queue = queue->next_neighbour)
5125  {
5126  if ((GNUNET_YES == queue->idle) &&
5127  (queue->validated_until.abs_value_us > now.abs_value_us))
5128  {
5130  "Direct neighbour %s not stalled\n",
5131  GNUNET_i2s (&n->pid));
5133  queue,
5135  elig = GNUNET_YES;
5136  }
5137  else
5139  "Neighbour Queue QID: %u (%u) busy or invalid\n",
5140  queue->qid,
5141  queue->idle);
5142  }
5143  }
5144  /* Notify queues via DV that we are interested */
5145  if (NULL != dv)
5146  {
5147  /* Do DV with lower scheduler priority, which effectively means that
5148  IF a neighbour exists and is available, we prefer it. */
5149  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
5150  pos = pos->next_dv)
5151  {
5152  struct Neighbour *nh = pos->next_hop;
5153 
5154 
5155  if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
5156  continue; /* skip this one: path not validated */
5157  else
5158  {
5159  vl_next_hop = lookup_virtual_link (&nh->pid);
5160  if (pm->bytes_msg + vl_next_hop->outbound_fc_window_size_used >
5161  vl_next_hop->outbound_fc_window_size)
5162  {
5164  "Stalled message %lu transmission on next hop %s due to flow control: %llu < %llu\n",
5165  pm->logging_uuid,
5166  GNUNET_i2s (&vl_next_hop->target),
5167  (unsigned long
5168  long) vl_next_hop->outbound_fc_window_size,
5169  (unsigned long long) (pm->bytes_msg
5170  + vl_next_hop->
5171  outbound_fc_window_size_used));
5172  consider_sending_fc (vl_next_hop);
5173  continue; /* We have a message, but flow control says "nope" for the first hop of this path */
5174  }
5175  for (struct Queue *queue = nh->queue_head; NULL != queue;
5176  queue = queue->next_neighbour)
5177  if ((GNUNET_YES == queue->idle) &&
5178  (queue->validated_until.abs_value_us > now.abs_value_us))
5179  {
5181  "Next hop neighbour %s not stalled\n",
5182  GNUNET_i2s (&nh->pid));
5184  queue,
5186  elig = GNUNET_YES;
5187  }
5188  else
5190  "DV Queue QID: %u (%u) busy or invalid\n",
5191  queue->qid,
5192  queue->idle);
5193  }
5194  }
5195  }
5196  if (GNUNET_YES == elig)
5198  "Eligible message %lu of size %llu to %s: %llu/%llu\n",
5199  pm->logging_uuid,
5200  pm->bytes_msg,
5201  GNUNET_i2s (&vl->target),
5202  (unsigned long long) vl->outbound_fc_window_size,
5203  (unsigned long long) (pm->bytes_msg
5205  break;
5206  }
5207 }
5208 
5209 
5216 static void
5217 handle_client_send (void *cls, const struct OutboundMessage *obm)
5218 {
5219  struct TransportClient *tc = cls;
5220  struct PendingMessage *pm;
5221  const struct GNUNET_MessageHeader *obmm;
5222  uint32_t bytes_msg;
5223  struct VirtualLink *vl;
5225 
5226  GNUNET_assert (CT_CORE == tc->type);
5227  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
5228  bytes_msg = ntohs (obmm->size);
5229  pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
5230  vl = lookup_virtual_link (&obm->peer);
5231  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5232  {
5234  "Don't have %s as a neighbour (anymore).\n",
5235  GNUNET_i2s (&obm->peer));
5236  /* Failure: don't have this peer as a neighbour (anymore).
5237  Might have gone down asynchronously, so this is NOT
5238  a protocol violation by CORE. Still count the event,
5239  as this should be rare. */
5242  "# messages dropped (neighbour unknown)",
5243  1,
5244  GNUNET_NO);
5245  return;
5246  }
5247 
5248  pm = GNUNET_malloc (sizeof(struct PendingMessage) + bytes_msg);
5250  "1 created pm %p storing vl %p\n",
5251  pm,
5252  vl);
5253  pm->logging_uuid = logging_uuid_gen++;
5254  pm->prefs = pp;
5255  pm->client = tc;
5256  pm->vl = vl;
5257  pm->bytes_msg = bytes_msg;
5258  memcpy (&pm[1], obmm, bytes_msg);
5260  "Sending %u bytes as <%llu> to %s\n",
5261  bytes_msg,
5262  pm->logging_uuid,
5263  GNUNET_i2s (&obm->peer));
5265  tc->details.core.pending_msg_head,
5266  tc->details.core.pending_msg_tail,
5267  pm);
5269  vl->pending_msg_head,
5270  vl->pending_msg_tail,
5271  pm);
5272  check_vl_transmission (vl);
5274 }
5275 
5276 
5286 static void
5288  void *cls,
5289  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
5290 {
5291  struct Neighbour *n;
5292  struct VirtualLink *vl;
5293  struct TransportClient *tc = cls;
5294  const struct GNUNET_MessageHeader *inbox =
5295  (const struct GNUNET_MessageHeader *) &cb[1];
5296  uint16_t isize = ntohs (inbox->size);
5297  const char *is = ((const char *) &cb[1]) + isize;
5298  size_t slen = strlen (is) + 1;
5299  char
5300  mbuf[slen + isize
5301  + sizeof(struct
5305 
5306  /* 0-termination of 'is' was checked already in
5307  #check_communicator_backchannel() */
5309  "Preparing backchannel transmission to %s:%s of type %u and size %u\n",
5310  GNUNET_i2s (&cb->pid),
5311  is,
5312  ntohs (inbox->type),
5313  ntohs (inbox->size));
5314  /* encapsulate and encrypt message */
5315  be->header.type =
5317  be->header.size = htons (sizeof(mbuf));
5318  memcpy (&be[1], inbox, isize);
5319  memcpy (&mbuf[sizeof(struct TransportBackchannelEncapsulationMessage)
5320  + isize],
5321  is,
5322  strlen (is) + 1);
5323  // route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED);
5324  vl = lookup_virtual_link (&cb->pid);
5325  if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
5326  {
5328  }
5329  else
5330  {
5331  /* Use route via neighbour */
5332  n = lookup_neighbour (&cb->pid);
5333  if (NULL != n)
5335  n,
5336  &be->header,
5337  RMO_NONE);
5338  }
5340 }
5341 
5342 
5350 static int
5352  const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
5353 {
5354  struct TransportClient *tc = cls;
5355 
5356  if (CT_COMMUNICATOR != tc->type)
5357  {
5358  GNUNET_break (0);
5359  return GNUNET_SYSERR;
5360  }
5362  return GNUNET_OK;
5363 }
5364 
5365 
5371 static void
5372 store_pi (void *cls);
5373 
5374 
5381 static void
5382 peerstore_store_own_cb (void *cls, int success)
5383 {
5384  struct AddressListEntry *ale = cls;
5385 
5386  ale->sc = NULL;
5387  if (GNUNET_YES != success)
5389  "Failed to store our own address `%s' in peerstore!\n",
5390  ale->address);
5391  else
5393  "Successfully stored our own address `%s' in peerstore!\n",
5394  ale->address);
5395  /* refresh period is 1/4 of expiration time, that should be plenty
5396  without being excessive. */
5397  ale->st =
5399  4ULL),
5400  &store_pi,
5401  ale);
5402 }
5403 
5404 
5410 static void
5411 store_pi (void *cls)
5412 {
5413  struct AddressListEntry *ale = cls;
5414  void *addr;
5415  size_t addr_len;
5417 
5418  ale->st = NULL;
5421  "Storing our address `%s' in peerstore until %s!\n",
5422  ale->address,
5425  ale->nt,
5428  &addr,
5429  &addr_len);
5431  "transport",
5432  &GST_my_identity,
5434  addr,
5435  addr_len,
5436  expiration,
5439  ale);
5440  GNUNET_free (addr);
5441  if (NULL == ale->sc)
5442  {
5444  "Failed to store our address `%s' with peerstore\n",
5445  ale->address);
5446  ale->st =
5448  }
5449 }
5450 
5451 
5458 static void
5460  const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
5461 {
5462  struct TransportClient *tc = cls;
5463  struct AddressListEntry *ale;
5464  size_t slen;
5465 
5466  /* 0-termination of &aam[1] was checked in #check_add_address */
5468  "Communicator added address `%s'!\n",
5469  (const char *) &aam[1]);
5470  slen = ntohs (aam->header.size) - sizeof(*aam);
5471  ale = GNUNET_malloc (sizeof(struct AddressListEntry) + slen);
5472  ale->tc = tc;
5473  ale->address = (const char *) &ale[1];
5474  ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
5475  ale->aid = aam->aid;
5476  ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
5477  memcpy (&ale[1], &aam[1], slen);
5478  GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
5479  tc->details.communicator.addr_tail,
5480  ale);
5481  ale->st = GNUNET_SCHEDULER_add_now (&store_pi, ale);
5483 }
5484 
5485 
5492 static void
5494  const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
5495 {
5496  struct TransportClient *tc = cls;
5497  struct AddressListEntry *alen;
5498 
5499  if (CT_COMMUNICATOR != tc->type)
5500  {
5501  GNUNET_break (0);
5502  GNUNET_SERVICE_client_drop (tc->client);
5503  return;
5504  }
5505  for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
5506  NULL != ale;
5507  ale = alen)
5508  {
5509  alen = ale->next;
5510  if (dam->aid != ale->aid)
5511  continue;
5512  GNUNET_assert (ale->tc == tc);
5514  "Communicator deleted address `%s'!\n",
5515  ale->address);
5518  return;
5519  }
5521  "Communicator removed address we did not even have.\n");
5523  // GNUNET_SERVICE_client_drop (tc->client);
5524 }
5525 
5526 
5534 static void
5536  const struct GNUNET_MessageHeader *msg);
5537 
5538 
5546 static void
5547 core_env_sent_cb (void *cls)
5548 {
5549  struct CoreSentContext *ctx = cls;
5550  struct VirtualLink *vl = ctx->vl;
5551 
5552  if (NULL == vl)
5553  {
5554  /* lost the link in the meantime, ignore */
5555  GNUNET_free (ctx);
5556  return;
5557  }
5560  vl->incoming_fc_window_size_ram -= ctx->size;
5561  vl->incoming_fc_window_size_used += ctx->isize;
5562  consider_sending_fc (vl);
5563  GNUNET_free (ctx);
5564 }
5565 
5566 
5575 static void
5576 handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
5577 {
5578  struct CommunicatorMessageContext *cmc = cls;
5579  struct VirtualLink *vl;
5580  uint16_t size = ntohs (mh->size);
5581  int have_core;
5582 
5584  "Handling raw message of type %u with %u bytes\n",
5585  (unsigned int) ntohs (mh->type),
5586  (unsigned int) ntohs (mh->size));
5587 
5588  if ((size > UINT16_MAX - sizeof(struct InboundMessage)) ||
5589  (size < sizeof(struct GNUNET_MessageHeader)))
5590  {
5591  struct GNUNET_SERVICE_Client *client = cmc->tc->client;
5592 
5593  GNUNET_break (0);
5594  finish_cmc_handling (cmc);
5595  GNUNET_SERVICE_client_drop (client);
5596  return;
5597  }
5598  vl = lookup_virtual_link (&cmc->im.sender);
5599  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5600  {
5601  /* FIXME: sender is giving us messages for CORE but we don't have
5602  the link up yet! I *suspect* this can happen right now (i.e.
5603  sender has verified us, but we didn't verify sender), but if
5604  we pass this on, CORE would be confused (link down, messages
5605  arrive). We should investigate more if this happens often,
5606  or in a persistent manner, and possibly do "something" about
5607  it. Thus logging as error for now. */
5608  GNUNET_break_op (0);
5610  "# CORE messages dropped (virtual link still down)",
5611  1,
5612  GNUNET_NO);
5613 
5615  "CORE messages of type %u with %u bytes dropped (virtual link still down)\n",
5616  (unsigned int) ntohs (mh->type),
5617  (unsigned int) ntohs (mh->size));
5618  finish_cmc_handling (cmc);
5619  return;
5620  }
5621  if (vl->incoming_fc_window_size_ram > UINT_MAX - size)
5622  {
5624  "# CORE messages dropped (FC arithmetic overflow)",
5625  1,
5626  GNUNET_NO);
5628  "CORE messages of type %u with %u bytes dropped (FC arithmetic overflow)\n",
5629  (unsigned int) ntohs (mh->type),
5630  (unsigned int) ntohs (mh->size));
5631  finish_cmc_handling (cmc);
5632  return;
5633  }
5635  {
5637  "# CORE messages dropped (FC window overflow)",
5638  1,
5639  GNUNET_NO);
5641  "CORE messages of type %u with %u bytes dropped (FC window overflow)\n",
5642  (unsigned int) ntohs (mh->type),
5643  (unsigned int) ntohs (mh->size));
5644  finish_cmc_handling (cmc);
5645  return;
5646  }
5647 
5648  /* Forward to all CORE clients */
5649  have_core = GNUNET_NO;
5650  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
5651  {
5652  struct GNUNET_MQ_Envelope *env;
5653  struct InboundMessage *im;
5654  struct CoreSentContext *ctx;
5655 
5656  if (CT_CORE != tc->type)
5657  continue;
5660  ctx = GNUNET_new (struct CoreSentContext);
5661  ctx->vl = vl;
5662  ctx->size = size;
5663  ctx->isize = (GNUNET_NO == have_core) ? size : 0;
5664  have_core = GNUNET_YES;
5667  im->peer = cmc->im.sender;
5668  memcpy (&im[1], mh, size);
5669  GNUNET_MQ_send (tc->mq, env);
5670  vl->core_recv_window--;
5671  }
5672  if (GNUNET_NO == have_core)
5673  {
5675  "Dropped message to CORE: no CORE client connected!\n");
5676  /* Nevertheless, count window as used, as it is from the
5677  perspective of the other peer! */
5679  /* TODO-M1 */
5681  "Dropped message of type %u with %u bytes to CORE: no CORE client connected!\n",
5682  (unsigned int) ntohs (mh->type),
5683  (unsigned int) ntohs (mh->size));
5684  finish_cmc_handling (cmc);
5685  return;
5686  }
5688  "Delivered message from %s of type %u to CORE\n",
5689  GNUNET_i2s (&cmc->im.sender),
5690  ntohs (mh->type));
5691  if (vl->core_recv_window > 0)
5692  {
5693  finish_cmc_handling (cmc);
5694  return;
5695  }
5696  /* Wait with calling #finish_cmc_handling(cmc) until the message
5697  was processed by CORE MQs (for CORE flow control)! */
5699 }
5700 
5701 
5709 static int
5711 {
5712  uint16_t size = ntohs (fb->header.size);
5713  uint16_t bsize = size - sizeof(*fb);
5714 
5715  (void) cls;
5716  if (0 == bsize)
5717  {
5718  GNUNET_break_op (0);
5719  return GNUNET_SYSERR;
5720  }
5721  if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
5722  {
5723  GNUNET_break_op (0);
5724  return GNUNET_SYSERR;
5725  }
5726  if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
5727  {
5728  GNUNET_break_op (0);
5729  return GNUNET_SYSERR;
5730  }
5731  return GNUNET_YES;
5732 }
5733 
5734 
5740 static void
5742 {
5743  struct AcknowledgementCummulator *ac = cls;
5744 
5745  ac->task = NULL;
5746  GNUNET_assert (0 == ac->num_acks);
5747  GNUNET_assert (
5748  GNUNET_YES ==
5750  GNUNET_free (ac);
5751 }
5752 
5753 
5759 static void
5761 {
5762  struct Neighbour *n;
5763  struct VirtualLink *vl;
5764  struct AcknowledgementCummulator *ac = cls;
5765  char buf[sizeof(struct TransportReliabilityAckMessage)
5766  + ac->num_acks
5768  struct TransportReliabilityAckMessage *ack =
5771 
5772  ac->task = NULL;
5774  "Sending ACK with %u components to %s\n",
5775  ac->num_acks,
5776  GNUNET_i2s (&ac->target));
5777  GNUNET_assert (0 < ac->num_acks);
5779  ack->header.size =
5780  htons (sizeof(*ack)
5781  + ac->num_acks * sizeof(struct TransportCummulativeAckPayloadP));
5782  ack->ack_counter = htonl (ac->ack_counter += ac->num_acks);
5783  ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
5784  for (unsigned int i = 0; i < ac->num_acks; i++)
5785  {
5786  ap[i].ack_uuid = ac->ack_uuids[i].ack_uuid;
5788  GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time));
5789  }
5790  /*route_control_message_without_fc (
5791  &ac->target,
5792  &ack->header,
5793  RMO_DV_ALLOWED);*/
5794  vl = lookup_virtual_link (&ac->target);
5795  if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
5796  {
5798  vl,
5799  &ack->header,
5800  RMO_DV_ALLOWED);
5801  }
5802  else
5803  {
5804  /* Use route via neighbour */
5805  n = lookup_neighbour (&ac->target);
5806  if (NULL != n)
5808  n,
5809  &ack->header,
5810  RMO_NONE);
5811  }
5812  ac->num_acks = 0;
5815  ac);
5816 }
5817 
5818 
5827 static void
5829  const struct AcknowledgementUUIDP *ack_uuid,
5830  struct GNUNET_TIME_Absolute max_delay)
5831 {
5832  struct AcknowledgementCummulator *ac;
5833 
5835  "Scheduling ACK %s for transmission to %s\n",
5836  GNUNET_uuid2s (&ack_uuid->value),
5837  GNUNET_i2s (pid));
5839  if (NULL == ac)
5840  {
5842  ac->target = *pid;
5843  ac->min_transmission_time = max_delay;
5847  &ac->target,
5848  ac,
5850  }
5851  else
5852  {
5853  if (MAX_CUMMULATIVE_ACKS == ac->num_acks)
5854  {
5855  /* must run immediately, ack buffer full! */
5857  }
5858  GNUNET_SCHEDULER_cancel (ac->task);
5859  ac->min_transmission_time =
5860  GNUNET_TIME_absolute_min (ac->min_transmission_time, max_delay);
5861  }
5862  GNUNET_assert (ac->num_acks < MAX_CUMMULATIVE_ACKS);
5863  ac->ack_uuids[ac->num_acks].receive_time = GNUNET_TIME_absolute_get ();
5864  ac->ack_uuids[ac->num_acks].ack_uuid = *ack_uuid;
5865  ac->num_acks++;
5866  ac->task = GNUNET_SCHEDULER_add_at (ac->min_transmission_time,
5868  ac);
5869 }
5870 
5871 
5876 {
5880  struct MessageUUIDP message_uuid;
5881 
5886 };
5887 
5888 
5898 static int
5899 find_by_message_uuid (void *cls, uint32_t key, void *value)
5900 {
5901  struct FindByMessageUuidContext *fc = cls;
5902  struct ReassemblyContext *rc = value;
5903 
5904  (void) key;
5905  if (0 == GNUNET_memcmp (&fc->message_uuid, &rc->msg_uuid))
5906  {
5907  fc->rc = rc;
5908  return GNUNET_NO;
5909  }
5910  return GNUNET_YES;
5911 }
5912 
5913 
5921 static void
5923 {
5924  struct CommunicatorMessageContext *cmc = cls;
5925  struct VirtualLink *vl;
5926  struct ReassemblyContext *rc;
5927  const struct GNUNET_MessageHeader *msg;
5928  uint16_t msize;
5929  uint16_t fsize;
5930  uint16_t frag_off;
5931  char *target;
5932  struct GNUNET_TIME_Relative cdelay;
5933  struct FindByMessageUuidContext fc;
5934 
5935  vl = lookup_virtual_link (&cmc->im.sender);
5936  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5937  {
5938  struct GNUNET_SERVICE_Client *client = cmc->tc->client;
5939 
5941  "No virtual link for %s to handle fragment\n",
5942  GNUNET_i2s (&cmc->im.sender));
5943  GNUNET_break (0);
5944  finish_cmc_handling (cmc);
5945  GNUNET_SERVICE_client_drop (client);
5946  return;
5947  }
5948  if (NULL == vl->reassembly_map)
5949  {
5951  vl->reassembly_heap =
5956  vl);
5957  }
5958  msize = ntohs (fb->msg_size);
5959  fc.message_uuid = fb->msg_uuid;
5960  fc.rc = NULL;
5962  fb->msg_uuid.uuid,
5964  &fc);
5965  fsize = ntohs (fb->header.size) - sizeof(*fb);
5966  if (NULL == (rc = fc.rc))
5967  {
5968  rc = GNUNET_malloc (sizeof(*rc) + msize /* reassembly payload buffer */
5969  + (msize + 7) / 8 * sizeof(uint8_t) /* bitfield */);
5970  rc->msg_uuid = fb->msg_uuid;
5971  rc->virtual_link = vl;
5972  rc->msg_size = msize;
5973  rc->reassembly_timeout =
5977  rc,
5981  vl->reassembly_map,
5982  rc->msg_uuid.uuid,
5983  rc,
5985  target = (char *) &rc[1];
5986  rc->bitfield = (uint8_t *) (target + rc->msg_size);
5987  if (fsize != rc->msg_size)
5988  rc->msg_missing = rc->msg_size;
5989  else
5990  rc->msg_missing = 0;
5992  "Received fragment with size %u at offset %u/%u %u bytes missing from %s for NEW message %u\n",
5993  fsize,
5994  ntohs (fb->frag_off),
5995  msize,
5996  rc->msg_missing,
5997  GNUNET_i2s (&cmc->im.sender),
5998  (unsigned int) fb->msg_uuid.uuid);
5999  }
6000  else
6001  {
6002  target = (char *) &rc[1];
6004  "Received fragment at offset %u/%u from %s for message %u\n",
6005  ntohs (fb->frag_off),
6006  msize,
6007  GNUNET_i2s (&cmc->im.sender),
6008  (unsigned int) fb->msg_uuid.uuid);
6009  }
6010  if (msize != rc->msg_size)
6011  {
6012  GNUNET_break (0);
6013  finish_cmc_handling (cmc);
6014  return;
6015  }
6016 
6017  /* reassemble */
6018  if (0 == fsize)
6019  {
6020  GNUNET_break (0);
6021  finish_cmc_handling (cmc);
6022  return;
6023  }
6024  frag_off = ntohs (fb->frag_off);
6025  if (frag_off + fsize > msize)
6026  {
6027  /* Fragment (plus fragment size) exceeds message size! */
6028  GNUNET_break_op (0);
6029  finish_cmc_handling (cmc);
6030  return;
6031  }
6032  memcpy (&target[frag_off], &fb[1], fsize);
6033  /* update bitfield and msg_missing */
6034  for (unsigned int i = frag_off; i < frag_off + fsize; i++)
6035  {
6036  if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
6037  {
6038  rc->bitfield[i / 8] |= (1 << (i % 8));
6039  rc->msg_missing--;
6040  }
6041  }
6042 
6043  /* Compute cumulative ACK */
6045  cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->msg_missing / fsize);
6046  if (0 == rc->msg_missing)
6047  cdelay = GNUNET_TIME_UNIT_ZERO;
6048  cummulative_ack (&cmc->im.sender,
6049  &fb->ack_uuid,
6052  /* is reassembly complete? */
6053  if (0 != rc->msg_missing)
6054  {
6055  finish_cmc_handling (cmc);
6056  return;
6057  }
6058  /* reassembly is complete, verify result */
6059  msg = (const struct GNUNET_MessageHeader *) &rc[1];
6060  if (ntohs (msg->size) != rc->msg_size)
6061  {
6062  GNUNET_break (0);
6064  finish_cmc_handling (cmc);
6065  return;
6066  }
6067  /* successful reassembly */
6069  "Fragment reassembly complete for message %u\n",
6070  (unsigned int) fb->msg_uuid.uuid);
6071  /* FIXME: check that the resulting msg is NOT a
6072  DV Box or Reliability Box, as that is NOT allowed! */
6073  demultiplex_with_cmc (cmc, msg);
6074  /* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still
6075  en-route and we forget that we finished this reassembly immediately!
6076  -> keep around until timeout?
6077  -> shorten timeout based on ACK? */
6079 }
6080 
6081 
6089 static int
6091  const struct TransportReliabilityBoxMessage *rb)
6092 {
6093  (void) cls;
6094  const struct GNUNET_MessageHeader *inbox = (const struct
6095  GNUNET_MessageHeader *) &rb[1];
6096 
6098  "check_send_msg with size %u: inner msg type %u and size %u (%u %u)\n",
6099  ntohs (rb->header.size),
6100  ntohs (inbox->type),
6101  ntohs (inbox->size),
6102  sizeof (struct TransportReliabilityBoxMessage),
6103  sizeof (struct GNUNET_MessageHeader));
6105  return GNUNET_YES;
6106 }
6107 
6108 
6116 static void
6118  const struct TransportReliabilityBoxMessage *rb)
6119 {
6120  struct CommunicatorMessageContext *cmc = cls;
6121  const struct GNUNET_MessageHeader *inbox =
6122  (const struct GNUNET_MessageHeader *) &rb[1];
6123  struct GNUNET_TIME_Relative rtt;
6124 
6126  "Received reliability box from %s with UUID %s of type %u\n",
6127  GNUNET_i2s (&cmc->im.sender),
6128  GNUNET_uuid2s (&rb->ack_uuid.value),
6129  (unsigned int) ntohs (inbox->type));
6130  rtt = GNUNET_TIME_UNIT_SECONDS; /* FIXME: should base this on "RTT", but we
6131  do not really have an RTT for the
6132  * incoming* queue (should we have
6133  the sender add it to the rb message?) */
6134  cummulative_ack (
6135  &cmc->im.sender,
6136  &rb->ack_uuid,
6137  (0 == ntohl (rb->ack_countdown))
6140  GNUNET_TIME_relative_divide (rtt, 8 /* FIXME: magic constant */)));
6141  /* continue with inner message */
6142  /* FIXME: check that inbox is NOT a DV Box, fragment or another
6143  reliability box (not allowed!) */
6144  demultiplex_with_cmc (cmc, inbox);
6145 }
6146 
6147 
6156 static void
6157 update_pd_age (struct PerformanceData *pd, unsigned int age)
6158 {
6159  unsigned int sage;
6160 
6161  if (age == pd->last_age)
6162  return; /* nothing to do */
6163  sage = GNUNET_MAX (pd->last_age, age - 2 * GOODPUT_AGING_SLOTS);
6164  for (unsigned int i = sage; i <= age - GOODPUT_AGING_SLOTS; i++)
6165  {
6166  struct TransmissionHistoryEntry *the = &pd->the[i % GOODPUT_AGING_SLOTS];
6167 
6168  the->bytes_sent = 0;
6169  the->bytes_received = 0;
6170  }
6171  pd->last_age = age;
6172 }
6173 
6174 
6183 static void
6185  struct GNUNET_TIME_Relative rtt,
6186  uint16_t bytes_transmitted_ok)
6187 {
6188  uint64_t nval = rtt.rel_value_us;
6189  uint64_t oval = pd->aged_rtt.rel_value_us;
6190  unsigned int age = get_age ();
6191  struct TransmissionHistoryEntry *the = &pd->the[age % GOODPUT_AGING_SLOTS];
6192 
6193  if (oval == GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us)
6194  pd->aged_rtt = rtt;
6195  else
6196  pd->aged_rtt.rel_value_us = (nval + 7 * oval) / 8;
6197  update_pd_age (pd, age);
6198  the->bytes_received += bytes_transmitted_ok;
6199 }
6200 
6201 
6209 static void
6211  struct GNUNET_TIME_Relative rtt,
6212  uint16_t bytes_transmitted_ok)
6213 {
6214  update_performance_data (&q->pd, rtt, bytes_transmitted_ok);
6215 }
6216 
6217 
6225 static void
6227  struct GNUNET_TIME_Relative rtt,
6228  uint16_t bytes_transmitted_ok)
6229 {
6230  update_performance_data (&dvh->pd, rtt, bytes_transmitted_ok);
6231 }
6232 
6233 
6241 static void
6243 {
6244  struct PendingMessage *pos;
6245 
6247  "Complete transmission of message %llu %u\n",
6248  pm->logging_uuid,
6249  pm->pmt);
6250  switch (pm->pmt)
6251  {
6252  case PMT_CORE:
6253  case PMT_RELIABILITY_BOX:
6254  /* Full message sent, we are done */
6256  return;
6257 
6258  case PMT_FRAGMENT_BOX:
6259  /* Fragment sent over reliable channel */
6260  pos = pm->frag_parent;
6261  GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
6263  /* check if subtree is done */
6264  while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
6265  (NULL != pos->frag_parent))
6266  {
6267  pm = pos;
6268  pos = pm->frag_parent;
6269  if ((NULL == pos) && (PMT_DV_BOX == pm->pmt))
6270  {
6272  return;
6273  }
6274  else if (PMT_DV_BOX == pm->pmt)
6275  {
6276  client_send_response (pos);
6277  return;
6278  }
6279  GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
6281  }
6282 
6283  /* Was this the last applicable fragment? */
6284  if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) &&
6285  (pos->frag_off == pos->bytes_msg))
6286  client_send_response (pos);
6287  return;
6288 
6289  case PMT_DV_BOX:
6291  "Completed transmission of message %llu (DV Box)\n",
6292  pm->logging_uuid);
6293  if (NULL != pm->frag_parent)
6294  {
6295  if (NULL != pm->bpm)
6296  {
6297  GNUNET_free (pm->bpm);
6298  }
6299  client_send_response (pm->frag_parent);
6300  }
6301  else
6303  return;
6304  }
6305 }
6306 
6307 
6315 static void
6317  struct GNUNET_TIME_Relative ack_delay)
6318 {
6319  struct GNUNET_TIME_Relative delay;
6320 
6322  if (delay.rel_value_us > ack_delay.rel_value_us)
6324  else
6325  delay = GNUNET_TIME_relative_subtract (delay, ack_delay);
6326  if (NULL != pa->queue)
6328  if (NULL != pa->dvh)
6330  if (NULL != pa->pm)
6333 }
6334 
6335 
6343 static int
6345  const struct TransportReliabilityAckMessage *ra)
6346 {
6347  unsigned int n_acks;
6348 
6349  (void) cls;
6350  n_acks = (ntohs (ra->header.size) - sizeof(*ra))
6351  / sizeof(struct TransportCummulativeAckPayloadP);
6352  if (0 == n_acks)
6353  {
6354  GNUNET_break_op (0);
6355  return GNUNET_SYSERR;
6356  }
6357  if ((ntohs (ra->header.size) - sizeof(*ra)) !=
6358  n_acks * sizeof(struct TransportCummulativeAckPayloadP))
6359  {
6360  GNUNET_break_op (0);
6361  return GNUNET_SYSERR;
6362  }
6363  return GNUNET_OK;
6364 }
6365 
6366 
6374 static void
6376  const struct TransportReliabilityAckMessage *ra)
6377 {
6378  struct CommunicatorMessageContext *cmc = cls;
6379  const struct TransportCummulativeAckPayloadP *ack;
6380  unsigned int n_acks;
6381  uint32_t ack_counter;
6382 
6383  n_acks = (ntohs (ra->header.size) - sizeof(*ra))
6384  / sizeof(struct TransportCummulativeAckPayloadP);
6385  ack = (const struct TransportCummulativeAckPayloadP *) &ra[1];
6386  for (unsigned int i = 0; i < n_acks; i++)
6387  {
6388  struct PendingAcknowledgement *pa =
6390  if (NULL == pa)
6391  {
6393  "Received ACK from %s with UUID %s which is unknown to us!\n",
6394  GNUNET_i2s (&cmc->im.sender),
6395  GNUNET_uuid2s (&ack[i].ack_uuid.value));
6397  GST_stats,
6398  "# FRAGMENT_ACKS dropped, no matching pending message",
6399  1,
6400  GNUNET_NO);
6401  continue;
6402  }
6404  "Received ACK from %s with UUID %s\n",
6405  GNUNET_i2s (&cmc->im.sender),
6406  GNUNET_uuid2s (&ack[i].ack_uuid.value));
6407  handle_acknowledged (pa, GNUNET_TIME_relative_ntoh (ack[i].ack_delay));
6408  }
6409 
6410  ack_counter = htonl (ra->ack_counter);
6411  (void) ack_counter; /* silence compiler warning for now */
6412  // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere!
6413  // (DV and/or Neighbour?)
6414  finish_cmc_handling (cmc);
6415 }
6416 
6417 
6425 static int
6427  void *cls,
6429 {
6430  uint16_t size = ntohs (be->header.size) - sizeof(*be);
6431  const struct GNUNET_MessageHeader *inbox =
6432  (const struct GNUNET_MessageHeader *) &be[1];
6433  const char *is;
6434  uint16_t isize;
6435 
6436  (void) cls;
6437  if (ntohs (inbox->size) >= size)
6438  {
6439  GNUNET_break_op (0);
6440  return GNUNET_SYSERR;
6441  }
6442  isize = ntohs (inbox->size);
6443  is = ((const char *) inbox) + isize;
6444  size -= isize;
6445  if ('\0' != is[size - 1])
6446  {
6447  GNUNET_break_op (0);
6448  return GNUNET_SYSERR;
6449  }
6450  return GNUNET_YES;
6451 }
6452 
6453 
6462 static void
6464  void *cls,
6466 {
6467  struct CommunicatorMessageContext *cmc = cls;
6468  struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
6469  struct GNUNET_MQ_Envelope *env;
6470  struct TransportClient *tc;
6471  const struct GNUNET_MessageHeader *inbox =
6472  (const struct GNUNET_MessageHeader *) &be[1];
6473  uint16_t isize = ntohs (inbox->size);
6474  const char *target_communicator = ((const char *) inbox) + isize;
6475  char *sender;
6476  char *self;
6477 
6478  GNUNET_asprintf (&sender,
6479  "%s",
6480  GNUNET_i2s (&cmc->im.sender));
6481  GNUNET_asprintf (&self,
6482  "%s",
6484 
6485  /* Find client providing this communicator */
6486  for (tc = clients_head; NULL != tc; tc = tc->next)
6487  if ((CT_COMMUNICATOR == tc->type) &&
6488  (0 ==
6489  strcmp (tc->details.communicator.address_prefix, target_communicator)))
6490  break;
6491  if (NULL == tc)
6492  {
6493  char *stastr;
6494 
6495  GNUNET_asprintf (
6496  &stastr,
6497  "# Backchannel message dropped: target communicator `%s' unknown",
6498  target_communicator);
6500  GNUNET_free (stastr);
6501  finish_cmc_handling (cmc);
6502  return;
6503  }
6504  /* Finally, deliver backchannel message to communicator */
6506  "Delivering backchannel message from %s to %s of type %u to %s\n",
6507  sender,
6508  self,
6509  ntohs (inbox->type),
6510  target_communicator);
6512  cbi,
6513  isize,
6515  cbi->pid = cmc->im.sender;
6516  memcpy (&cbi[1], inbox, isize);
6517  GNUNET_MQ_send (tc->mq, env);
6518  finish_cmc_handling (cmc);
6519 }
6520 
6521 
6531 static void
6532 path_cleanup_cb (void *cls)
6533 {
6534  struct DistanceVector *dv = cls;
6535  struct DistanceVectorHop *pos;
6536 
6537  dv->timeout_task = NULL;
6538  while (NULL != (pos = dv->dv_head))
6539  {
6540  GNUNET_assert (dv == pos->dv);
6542  break;
6544  }
6545  if (NULL == pos)
6546  {
6547  free_dv_route (dv);
6548  return;
6549  }
6550  dv->timeout_task =
6552 }
6553 
6554 
6562 static void
6564 {
6565  struct DistanceVector *dv = hop->dv;
6566  struct VirtualLink *vl;
6567 
6568  vl = lookup_virtual_link (&dv->target);
6569  if (NULL == vl)
6570  {
6571 
6572  vl = GNUNET_new (struct VirtualLink);
6574  "Creating new virtual link %p to %s using DV!\n",
6575  vl,
6576  GNUNET_i2s (&dv->target));
6577  vl->confirmed = GNUNET_YES;
6578  vl->message_uuid_ctr =
6580  vl->target = dv->target;
6586  links,
6587  &vl->target,
6588  vl,
6590  vl->dv = dv;
6591  dv->vl = vl;
6592  vl->visibility_task =
6594  consider_sending_fc (vl);
6595  /* We lacked a confirmed connection to the target
6596  before, so tell CORE about it (finally!) */
6598  }
6599  else
6600  {
6601  /* Link was already up, remember dv is also now available and we are done */
6602  vl->dv = dv;
6603  dv->vl = vl;
6604  if (GNUNET_NO == vl->confirmed)
6605  {
6606  vl->confirmed = GNUNET_YES;
6607  vl->visibility_task =
6609  consider_sending_fc (vl);
6610  /* We lacked a confirmed connection to the target
6611  before, so tell CORE about it (finally!) */
6613  }
6614  else
6616  "Virtual link to %s could now also use DV!\n",
6617  GNUNET_i2s (&dv->target));
6618  }
6619 }
6620 
6621 
6647 static int
6649  unsigned int path_len,
6650  struct GNUNET_TIME_Relative network_latency,
6651  struct GNUNET_TIME_Absolute path_valid_until)
6652 {
6653  struct DistanceVectorHop *hop;
6654  struct DistanceVector *dv;
6655  struct Neighbour *next_hop;
6656  unsigned int shorter_distance;
6657 
6658  if (path_len < 3)
6659  {
6660  /* what a boring path! not allowed! */
6661  GNUNET_break (0);
6662  return GNUNET_SYSERR;
6663  }
6664  GNUNET_assert (0 == GNUNET_memcmp (&GST_my_identity, &path[0]));
6665  next_hop = lookup_neighbour (&path[1]);
6666  if (NULL == next_hop)
6667  {
6668  /* next hop must be a neighbour, otherwise this whole thing is useless! */
6669  GNUNET_break (0);
6670  return GNUNET_SYSERR;
6671  }
6672  for (unsigned int i = 2; i < path_len; i++)
6673  if (NULL != lookup_neighbour (&path[i]))
6674  {
6675  /* Useless path: we have a direct connection to some hop
6676  in the middle of the path, so this one is not even
6677  terribly useful for redundancy */
6679  "Path of %u hops useless: directly link to hop %u (%s)\n",
6680  path_len,
6681  i,
6682  GNUNET_i2s (&path[i]));
6684  "# Useless DV path ignored: hop is neighbour",
6685  1,
6686  GNUNET_NO);
6687  return GNUNET_SYSERR;
6688  }
6689  dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &path[path_len - 1]);
6690  if (NULL == dv)
6691  {
6692  dv = GNUNET_new (struct DistanceVector);
6693  dv->target = path[path_len - 1];
6695  &path_cleanup_cb,
6696  dv);
6699  dv_routes,
6700  &dv->target,
6701  dv,
6703  }
6704  /* Check if we have this path already! */
6705  shorter_distance = 0;
6706  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
6707  pos = pos->next_dv)
6708  {
6709  if (pos->distance < path_len - 3)
6710  shorter_distance++;
6711  /* Note that the distances in 'pos' excludes us (path[0]),
6712  the next_hop (path[1]) and the target so we need to subtract three
6713  and check next_hop explicitly */
6714  if ((pos->distance == path_len - 3) && (pos->next_hop == next_hop))
6715  {
6716  int match = GNUNET_YES;
6717 
6718  for (unsigned int i = 0; i < pos->distance; i++)
6719  {
6720  if (0 != GNUNET_memcmp (&pos->path[i], &path[i + 2]))
6721  {
6722  match = GNUNET_NO;
6723  break;
6724  }
6725  }
6726  if (GNUNET_YES == match)
6727  {
6728  struct GNUNET_TIME_Relative last_timeout;
6729 
6730  /* Re-discovered known path, update timeout */
6732  "# Known DV path refreshed",
6733  1,
6734  GNUNET_NO);
6735  last_timeout = GNUNET_TIME_absolute_get_remaining (pos->timeout);
6736  pos->timeout =
6738  pos->path_valid_until =
6739  GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until);
6740  GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos);
6741  GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos);
6742  if (0 <
6745  if (last_timeout.rel_value_us <
6748  .rel_value_us)
6749  {
6750  /* Some peer send DV learn messages too often, we are learning
6751  the same path faster than it would be useful; do not forward! */
6753  "Rediscovered path too quickly, not forwarding further\n");
6754  return GNUNET_NO;
6755  }
6757  "Refreshed known path to %s valid until %s, forwarding further\n",
6758  GNUNET_i2s (&dv->target),
6760  pos->path_valid_until));
6761  return GNUNET_YES;
6762  }
6763  }
6764  }
6765  /* Count how many shorter paths we have (incl. direct
6766  neighbours) before simply giving up on this one! */
6767  if (shorter_distance >= MAX_DV_PATHS_TO_TARGET)
6768  {
6769  /* We have a shorter path already! */
6771  "Have many shorter DV paths %s, not forwarding further\n",
6772  GNUNET_i2s (&dv->target));
6773  return GNUNET_NO;
6774  }
6775  /* create new DV path entry */
6777  "Discovered new DV path to %s valid until %s\n",
6778  GNUNET_i2s (&dv->target),
6779  GNUNET_STRINGS_absolute_time_to_string (path_valid_until));
6780  hop = GNUNET_malloc (sizeof(struct DistanceVectorHop)
6781  + sizeof(struct GNUNET_PeerIdentity) * (path_len - 3));
6782  hop->next_hop = next_hop;
6783  hop->dv = dv;
6784  hop->path = (const struct GNUNET_PeerIdentity *) &hop[1];
6785  memcpy (&hop[1],
6786  &path[2],
6787  sizeof(struct GNUNET_PeerIdentity) * (path_len - 3));
6789  hop->path_valid_until = path_valid_until;
6790  hop->distance = path_len - 3;
6791  hop->pd.aged_rtt = network_latency;
6792  GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, hop);
6793  GNUNET_CONTAINER_MDLL_insert (neighbour,
6794  next_hop->dv_head,
6795  next_hop->dv_tail,
6796  hop);
6797  if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
6799  return GNUNET_YES;
6800 }
6801 
6802 
6810 static int
6811 check_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
6812 {
6813  uint16_t size = ntohs (dvl->header.size);
6814  uint16_t num_hops = ntohs (dvl->num_hops);
6815  const struct DVPathEntryP *hops = (const struct DVPathEntryP *) &dvl[1];
6816 
6817  (void) cls;
6818  if (size != sizeof(*dvl) + num_hops * sizeof(struct DVPathEntryP))
6819  {
6820  GNUNET_break_op (0);
6821  return GNUNET_SYSERR;
6822  }
6823  if (num_hops > MAX_DV_HOPS_ALLOWED)
6824  {
6825  GNUNET_break_op (0);
6826  return GNUNET_SYSERR;
6827  }
6828  for (unsigned int i = 0; i < num_hops; i++)
6829  {
6830  if (0 == GNUNET_memcmp (&dvl->initiator, &hops[i].hop))
6831  {
6832  GNUNET_break_op (0);
6833  return GNUNET_SYSERR;
6834  }
6835  if (0 == GNUNET_memcmp (&GST_my_identity, &hops[i].hop))
6836  {
6837  GNUNET_break_op (0);
6838  return GNUNET_SYSERR;
6839  }
6840  }
6841  return GNUNET_YES;
6842 }
6843 
6844 
6856 static void
6857 forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
6858  const struct TransportDVLearnMessage *msg,
6859  uint16_t bi_history,
6860  uint16_t nhops,
6861  const struct DVPathEntryP *hops,
6862  struct GNUNET_TIME_Absolute in_time)
6863 {
6864  struct Neighbour *n;
6865  struct VirtualLink *vl;
6866  struct DVPathEntryP *dhops;
6867  char buf[sizeof(struct TransportDVLearnMessage)
6868  + (nhops + 1) * sizeof(struct DVPathEntryP)] GNUNET_ALIGN;
6869  struct TransportDVLearnMessage *fwd = (struct TransportDVLearnMessage *) buf;
6870  struct GNUNET_TIME_Relative nnd;
6871 
6872  /* compute message for forwarding */
6874  "Forwarding DV learn message originating from %s to %s\n",
6875  GNUNET_i2s (&msg->initiator),
6876  GNUNET_i2s2 (next_hop));
6879  fwd->header.size = htons (sizeof(struct TransportDVLearnMessage)
6880  + (nhops + 1) * sizeof(struct DVPathEntryP));
6881  fwd->num_hops = htons (nhops + 1);
6882  fwd->bidirectional = htons (bi_history);
6885  msg->non_network_delay));
6887  fwd->init_sig = msg->init_sig;
6888  fwd->initiator = msg->initiator;
6889  fwd->challenge = msg->challenge;
6890  fwd->monotonic_time = msg->monotonic_time;
6891  dhops = (struct DVPathEntryP *) &fwd[1];
6892  GNUNET_memcpy (dhops, hops, sizeof(struct DVPathEntryP) * nhops);
6893  dhops[nhops].hop = GST_my_identity;
6894  {
6895  struct DvHopPS dhp = {
6897  .purpose.size = htonl (sizeof(dhp)),
6898  .pred = (0 == nhops) ? msg->initiator : dhops[nhops - 1].hop,
6899  .succ = *next_hop,
6900  .challenge = msg->challenge
6901  };
6903  &dhp,
6904  &dhops[nhops].hop_sig);
6905  }
6906  /*route_control_message_without_fc (next_hop,
6907  &fwd->header,
6908  RMO_UNCONFIRMED_ALLOWED);*/
6909  vl = lookup_virtual_link (next_hop);
6910  if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
6911  {
6913  &fwd->header,
6915  }
6916  else
6917  {
6918  /* Use route via neighbour */
6919  n = lookup_neighbour (next_hop);
6920  if (NULL != n)
6922  n,
6923  &fwd->header,
6925  }
6926 }
6927 
6928 
6938 static int
6940  struct GNUNET_TIME_AbsoluteNBO sender_monotonic_time,
6941  const struct GNUNET_PeerIdentity *init,
6943  const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
6944 {
6945  struct DvInitPS ip = { .purpose.purpose = htonl (
6947  .purpose.size = htonl (sizeof(ip)),
6948  .monotonic_time = sender_monotonic_time,
6949  .challenge = *challenge };
6950 
6951  if (
6952  GNUNET_OK !=
6954  &ip,
6955  init_sig,
6956  &init->public_key))
6957  {
6958  GNUNET_break_op (0);
6959  return GNUNET_SYSERR;
6960  }
6961  return GNUNET_OK;
6962 }
6963 
6964 
6969 {
6974 
6978  const struct DVPathEntryP *hops;
6979 
6984 
6989 
6993  unsigned int num_eligible;
6994 
6998  unsigned int num_selections;
6999 
7003  uint16_t nhops;
7004 
7008  uint16_t bi_history;
7009 };
7010 
7011 
7020 static int
7022  const struct GNUNET_PeerIdentity *pid,
7023  void *value)
7024 {
7025  struct NeighbourSelectionContext *nsc = cls;
7026 
7027  (void) value;
7028  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
7029  return GNUNET_YES; /* skip initiator */
7030  for (unsigned int i = 0; i < nsc->nhops; i++)
7031  if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
7032  return GNUNET_YES;
7033  /* skip peers on path */
7034  nsc->num_eligible++;
7035  return GNUNET_YES;
7036 }
7037 
7038 
7049 static int
7051  const struct GNUNET_PeerIdentity *pid,
7052  void *value)
7053 {
7054  struct NeighbourSelectionContext *nsc = cls;
7055 
7057  "transmission %s\n",
7058  GNUNET_i2s (pid));
7059  (void) value;
7060  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
7061  return GNUNET_YES; /* skip initiator */
7062  for (unsigned int i = 0; i < nsc->nhops; i++)
7063  if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
7064  return GNUNET_YES;
7065  /* skip peers on path */
7066  for (unsigned int i = 0; i < nsc->num_selections; i++)
7067  {
7068  if (nsc->selections[i] == nsc->num_eligible)
7069  {
7071  nsc->dvl,
7072  nsc->bi_history,
7073  nsc->nhops,
7074  nsc->hops,
7075  nsc->in_time);
7076  break;
7077  }
7078  }
7079  nsc->num_eligible++;
7080  return GNUNET_YES;
7081 }
7082 
7083 
7127 static unsigned int
7128 calculate_fork_degree (unsigned int hops_taken,
7129  unsigned int neighbour_count,
7130  unsigned int eligible_count)
7131 {
7132  double target_total = 50.0; /* FIXME: use LOG(NSE)? */
7133  double eligible_ratio =
7134  ((double) eligible_count) / ((double) neighbour_count);
7135  double boost_factor = eligible_ratio * eligible_ratio;
7136  unsigned int rnd;
7137  double left;
7138 
7139  if (hops_taken >= 64)
7140  {
7141  GNUNET_break (0);
7142  return 0; /* precaution given bitshift below */
7143  }
7144  for (unsigned int i = 1; i < hops_taken; i++)
7145  {
7146  /* For each hop, subtract the expected number of targets
7147  reached at distance d (so what remains divided by 2^d) */
7148  target_total -= (target_total * boost_factor / (1LLU << i));
7149  }
7150  rnd =
7151  (unsigned int) floor (target_total * boost_factor / (1LLU << hops_taken));
7152  /* round up or down probabilistically depending on how close we were
7153  when floor()ing to rnd */
7154  left = target_total - (double) rnd;
7155  if (UINT32_MAX * left >
7157  rnd++; /* round up */
7159  "Forwarding DV learn message of %u hops %u(/%u/%u) times\n",
7160  hops_taken,
7161  rnd,
7162  eligible_count,
7163  neighbour_count);
7164  return rnd;
7165 }
7166 
7167 
7174 static void
7175 neighbour_store_dvmono_cb (void *cls, int success)
7176 {
7177  struct Neighbour *n = cls;
7178 
7179  n->sc = NULL;
7180  if (GNUNET_YES != success)
7182  "Failed to store other peer's monotonic time in peerstore!\n");
7183 }
7184 
7185 
7193 static void
7194 handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
7195 {
7196  struct CommunicatorMessageContext *cmc = cls;
7198  int bi_hop;
7199  uint16_t nhops;
7200  uint16_t bi_history;
7201  const struct DVPathEntryP *hops;
7202  int do_fwd;
7203  int did_initiator;
7204  struct GNUNET_TIME_Absolute in_time;
7205  struct Neighbour *n;
7206 
7207  nhops = ntohs (dvl->num_hops); /* 0 = sender is initiator */
7208  bi_history = ntohs (dvl->bidirectional);
7209  hops = (const struct DVPathEntryP *) &dvl[1];
7210  if (0 == nhops)
7211  {
7212  /* sanity check */
7213  if (0 != GNUNET_memcmp (&dvl->initiator, &cmc->im.sender))
7214  {
7215  GNUNET_break (0);
7216  finish_cmc_handling (cmc);
7217  return;
7218  }
7219  }
7220  else
7221  {
7223  "handle dv learn message last hop %s\n",
7224  GNUNET_i2s (&hops[nhops - 1].hop));
7225  /* sanity check */
7226  if (0 != GNUNET_memcmp (&hops[nhops - 1].hop, &cmc->im.sender))
7227  {
7228  GNUNET_break (0);
7229  finish_cmc_handling (cmc);
7230  return;
7231  }
7232  }
7233 
7234  GNUNET_assert (CT_COMMUNICATOR == cmc->tc->type);
7235  cc = cmc->tc->details.communicator.cc;
7236  bi_hop = (GNUNET_TRANSPORT_CC_RELIABLE ==
7237  cc); // FIXME: add bi-directional flag to cc?
7238  in_time = GNUNET_TIME_absolute_get ();
7239 
7240  /* continue communicator here, everything else can happen asynchronous! */
7241  finish_cmc_handling (cmc);
7242 
7243  n = lookup_neighbour (&dvl->initiator);
7244  if (NULL != n)
7245  {
7246  if ((n->dv_monotime_available == GNUNET_YES) &&
7249  {
7251  "DV learn from %s discarded due to time travel",
7252  GNUNET_i2s (&dvl->initiator));
7254  "# DV learn discarded due to time travel",
7255  1,
7256  GNUNET_NO);
7257  return;
7258  }
7260  &dvl->initiator,
7261  &dvl->challenge,
7262  &dvl->init_sig))
7263  {
7265  "DV learn signature from %s invalid\n",
7266  GNUNET_i2s (&dvl->initiator));
7267  GNUNET_break_op (0);
7268  return;
7269  }
7272  {
7273  if (NULL != n->sc)
7274  {
7276  "store cancel\n");
7278  }
7279  n->sc =
7281  "transport",
7282  &dvl->initiator,
7284  &dvl->monotonic_time,
7285  sizeof(dvl->monotonic_time),
7289  n);
7290  }
7291  }
7292  /* OPTIMIZE-FIXME: asynchronously (!) verify signatures!,
7293  If signature verification load too high, implement random drop strategy */
7294  for (unsigned int i = 0; i < nhops; i++)
7295  {
7296  struct DvHopPS dhp = { .purpose.purpose =
7298  .purpose.size = htonl (sizeof(dhp)),
7299  .pred = (0 == i) ? dvl->initiator : hops[i - 1].hop,
7300  .succ = (nhops == i + 1) ? GST_my_identity
7301  : hops[i + 1].hop,
7302  .challenge = dvl->challenge };
7303 
7304  if (GNUNET_OK !=
7306  &dhp,
7307  &hops[i].hop_sig,
7308  &hops[i].hop.public_key))
7309  {
7311  "DV learn from %s signature of hop %u invalid\n",
7312  GNUNET_i2s (&dvl->initiator),
7313  i);
7315  "signature of hop %s invalid\n",
7316  GNUNET_i2s (&hops[i].hop));
7318  "pred %s\n",
7319  GNUNET_i2s (&dhp.pred));
7321  "succ %s\n",
7322  GNUNET_i2s (&dhp.succ));
7324  "hash %s\n",
7325  GNUNET_sh2s (&dhp.challenge.value));
7326  GNUNET_break_op (0);
7327  return;
7328  }
7329  }
7330  if (GNUNET_EXTRA_LOGGING > 0)
7331  {
7332  char *path;
7333 
7334  path = GNUNET_strdup (GNUNET_i2s (&dvl->initiator));
7335  for (unsigned int i = 0; i < nhops; i++)
7336  {
7337  char *tmp;
7338 
7339  GNUNET_asprintf (&tmp,
7340  "%s%s%s",
7341  path,
7342  (bi_history & (1 << (nhops - i))) ? "<->" : "-->",
7343  GNUNET_i2s (&hops[i].hop));
7344  GNUNET_free (path);
7345  path = tmp;
7346  }
7348  "Received DVInit via %s%s%s\n",
7349  path,
7350  bi_hop ? "<->" : "-->",
7352  GNUNET_free (path);
7353  }
7354  do_fwd = GNUNET_YES;
7355  if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator))
7356  {
7357  struct GNUNET_PeerIdentity path[nhops + 1];
7358  struct GNUNET_TIME_Relative host_latency_sum;
7359  struct GNUNET_TIME_Relative latency;
7360  struct GNUNET_TIME_Relative network_latency;
7361  struct GNUNET_TIME_Absolute now;
7362 
7363  /* We initiated this, learn the forward path! */
7364  path[0] = GST_my_identity;
7365  path[1] = hops[0].hop;
7366  host_latency_sum = GNUNET_TIME_relative_ntoh (dvl->non_network_delay);
7367 
7368  // Need also something to lookup initiation time
7369  // to compute RTT! -> add RTT argument here?
7370  now = GNUNET_TIME_absolute_get ();
7372  dvl->monotonic_time));
7373  GNUNET_assert (latency.rel_value_us >= host_latency_sum.rel_value_us);
7374  // latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
7375  // (based on dvl->challenge, we can identify time of origin!)
7376 
7377  network_latency = GNUNET_TIME_relative_subtract (latency, host_latency_sum);
7378  /* assumption: latency on all links is the same */
7379  network_latency = GNUNET_TIME_relative_divide (network_latency, nhops);
7380 
7381  for (unsigned int i = 2; i <= nhops; i++)
7382  {
7383  struct GNUNET_TIME_Relative ilat;
7384 
7385  /* assumption: linear latency increase per hop */
7386  ilat = GNUNET_TIME_relative_multiply (network_latency, i);
7387  path[i] = hops[i - 1].hop;
7389  "Learned path with %u hops to %s with latency %s\n",
7390  i,
7391  GNUNET_i2s (&path[i]),
7393  learn_dv_path (path,
7394  i + 1,
7395  ilat,
7398  }
7399  /* as we initiated, do not forward again (would be circular!) */
7400  do_fwd = GNUNET_NO;
7401  return;
7402  }
7403  if (bi_hop)
7404  {
7405  /* last hop was bi-directional, we could learn something here! */
7406  struct GNUNET_PeerIdentity path[nhops + 2];
7407 
7408  path[0] = GST_my_identity;
7409  path[1] = hops[nhops - 1].hop; /* direct neighbour == predecessor! */
7410  for (unsigned int i = 0; i < nhops; i++)
7411  {
7412  int iret;
7413 
7414  if (0 == (bi_history & (1 << i)))
7415  break; /* i-th hop not bi-directional, stop learning! */
7416  if (i == nhops - 1)
7417  {
7418  path[i + 2] = dvl->initiator;
7419  }
7420  else
7421  {
7422  path[i + 2] = hops[nhops - i - 2].hop;
7423  }
7424 
7426  "Learned inverse path with %u hops to %s\n",
7427  i + 2,
7428  GNUNET_i2s (&path[i + 2]));
7429  iret = learn_dv_path (path,
7430  i + 3,
7433  if (GNUNET_SYSERR == iret)
7434  {
7435  /* path invalid or too long to be interesting for US, thus should also
7436  not be interesting to our neighbours, cut path when forwarding to
7437  'i' hops, except of course for the one that goes back to the
7438  initiator */
7440  "# DV learn not forwarded due invalidity of path",
7441  1,
7442  GNUNET_NO);
7443  do_fwd = GNUNET_NO;
7444  break;
7445  }
7446  if ((GNUNET_NO == iret) && (nhops == i + 1))
7447  {
7448  /* we have better paths, and this is the longest target,
7449  so there cannot be anything interesting later */
7451  "# DV learn not forwarded, got better paths",
7452  1,
7453  GNUNET_NO);
7454  do_fwd = GNUNET_NO;
7455  break;
7456  }
7457  }
7458  }
7459  if (MAX_DV_HOPS_ALLOWED == nhops)
7460  {
7461  /* At limit, we're out of here! */
7462  return;
7463  }
7464 
7465  /* Forward to initiator, if path non-trivial and possible */
7466  bi_history = (bi_history << 1) | (bi_hop ? 1 : 0);
7467  did_initiator = GNUNET_NO;
7468  if ((1 <= nhops) &&
7469  (GNUNET_YES ==
7471  {
7472  /* send back to origin! */
7474  "Sending DVL back to initiator %s\n",
7475  GNUNET_i2s (&dvl->initiator));
7476  forward_dv_learn (&dvl->initiator, dvl, bi_history, nhops, hops, in_time);
7477  did_initiator = GNUNET_YES;
7478  }
7479  /* We forward under two conditions: either we still learned something
7480  ourselves (do_fwd), or the path was darn short and thus the initiator is
7481  likely to still be very interested in this (and we did NOT already
7482  send it back to the initiator) */
7483