GNUnet  0.10.x
gnunet-service-tng.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
75 #include "platform.h"
76 #include "gnunet_util_lib.h"
80 #include "gnunet_hello_lib.h"
81 #include "gnunet_signatures.h"
82 #include "transport.h"
83 
88 #define MAX_CUMMULATIVE_ACKS 64
89 
102 #define FC_NO_CHANGE_REPLY_PROBABILITY 8
103 
108 #define IN_PACKET_SIZE_WITHOUT_MTU 128
109 
114 #define GOODPUT_AGING_SLOTS 4
115 
120 #define DEFAULT_WINDOW_SIZE (128 * 1024)
121 
130 #define MAX_INCOMING_REQUEST 16
131 
136 #define MAX_DV_DISCOVERY_SELECTION 16
137 
146 #define RECV_WINDOW_SIZE 4
147 
155 #define MIN_DV_PATH_LENGTH_FOR_INITIATOR 3
156 
160 #define MAX_DV_HOPS_ALLOWED 16
161 
166 #define MAX_DV_LEARN_PENDING 64
167 
171 #define MAX_DV_PATHS_TO_TARGET 3
172 
178 #define DELAY_WARN_THRESHOLD \
179  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
180 
185 #define DV_FORWARD_TIMEOUT \
186  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
187 
193 #define DV_QUALITY_RTT_THRESHOLD \
194  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
195 
200 #define DV_PATH_VALIDITY_TIMEOUT \
201  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
202 
207 #define BACKCHANNEL_INACTIVITY_TIMEOUT \
208  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
209 
214 #define DV_PATH_DISCOVERY_FREQUENCY \
215  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
216 
220 #define EPHEMERAL_VALIDITY \
221  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
222 
226 #define REASSEMBLY_EXPIRATION \
227  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
228 
233 #define FAST_VALIDATION_CHALLENGE_FREQ \
234  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
235 
239 #define MAX_VALIDATION_CHALLENGE_FREQ \
240  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
241 
247 #define ACK_CUMMULATOR_TIMEOUT \
248  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
249 
254 #define DV_LEARN_BASE_FREQUENCY GNUNET_TIME_UNIT_MINUTES
255 
260 #define DV_LEARN_QUALITY_THRESHOLD 100
261 
265 #define MAX_ADDRESS_VALID_UNTIL \
266  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MONTHS, 1)
267 
271 #define ADDRESS_VALIDATION_LIFETIME \
272  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
273 
280 #define MIN_DELAY_ADDRESS_VALIDATION GNUNET_TIME_UNIT_MILLISECONDS
281 
288 #define VALIDATION_RTT_BUFFER_FACTOR 3
289 
296 #define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
297 
303 #define QUEUE_LENGTH_LIMIT 32
304 
305 
307 
312 {
317  uint64_t uuid GNUNET_PACKED;
318 };
319 
320 
325 {
330 };
331 
332 
337 {
341  struct GNUNET_ShortHashCode value;
342 };
343 
344 
349 {
353  struct GNUNET_MessageHeader header;
354 
355  /* Followed by *another* message header which is the message to
356  the communicator */
357 
358  /* Followed by a 0-terminated name of the communicator */
359 };
360 
361 
366 {
367 
372 
387  struct GNUNET_TIME_AbsoluteNBO sender_monotonic_time;
388 
392  struct GNUNET_PeerIdentity target;
393 
398  struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
399 };
400 
401 
407 {
408 
412  struct GNUNET_PeerIdentity sender;
413 
418  struct GNUNET_CRYPTO_EddsaSignature sender_sig;
419 
430  struct GNUNET_TIME_AbsoluteNBO monotonic_time;
431 
432  /* Followed by a `struct GNUNET_MessageHeader` with a message
433  for the target peer */
434 };
435 
436 
442 {
446  struct GNUNET_MessageHeader header;
447 
454  uint32_t ack_countdown GNUNET_PACKED;
455 
461  struct AcknowledgementUUIDP ack_uuid;
462 };
463 
464 
469 {
476  struct GNUNET_TIME_RelativeNBO ack_delay;
477 
481  struct AcknowledgementUUIDP ack_uuid;
482 };
483 
484 
493 {
497  struct GNUNET_MessageHeader header;
498 
503  uint32_t ack_counter GNUNET_PACKED;
504 
505  /* followed by any number of `struct TransportCummulativeAckPayloadP`
506  messages providing ACKs */
507 };
508 
509 
514 {
518  struct GNUNET_MessageHeader header;
519 
523  uint16_t frag_off GNUNET_PACKED;
524 
528  uint16_t msg_size GNUNET_PACKED;
529 
537  struct AcknowledgementUUIDP ack_uuid;
538 
543  struct MessageUUIDP msg_uuid;
544 };
545 
546 
564 struct DvInitPS
565 {
570 
583  struct GNUNET_TIME_AbsoluteNBO monotonic_time;
584 
588  struct ChallengeNonceP challenge;
589 };
590 
591 
608 struct DvHopPS
609 {
614 
618  struct GNUNET_PeerIdentity pred;
619 
623  struct GNUNET_PeerIdentity succ;
624 
628  struct ChallengeNonceP challenge;
629 };
630 
631 
637 {
642 
648 };
649 
650 
665 {
669  struct GNUNET_MessageHeader header;
670 
675  uint16_t num_hops GNUNET_PACKED;
676 
685  uint16_t bidirectional GNUNET_PACKED;
686 
692  struct GNUNET_TIME_RelativeNBO non_network_delay;
693 
706  struct GNUNET_TIME_AbsoluteNBO monotonic_time;
707 
713 
717  struct GNUNET_PeerIdentity initiator;
718 
722  struct ChallengeNonceP challenge;
723 
724  /* Followed by @e num_hops `struct DVPathEntryP` values,
725  excluding the initiator of the DV trace; the last entry is the
726  current sender; the current peer must not be included. */
727 };
728 
729 
753 {
757  struct GNUNET_MessageHeader header;
758 
765  uint16_t total_hops GNUNET_PACKED;
766 
772  uint16_t num_hops GNUNET_PACKED;
773 
778  struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
779 
786 
792  struct GNUNET_HashCode hmac;
793 
794  /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
795  excluding the @e origin and the current peer, the last must be
796  the ultimate target; if @e num_hops is zero, the receiver of this
797  message is the ultimate target. */
798 
799  /* Followed by encrypted, variable-size payload, which
800  must begin with a `struct TransportDVBoxPayloadP` */
801 
802  /* Followed by the actual message, which itself must not be a
803  a DV_LEARN or DV_BOX message! */
804 };
805 
806 
812 {
813 
817  struct GNUNET_MessageHeader header;
818 
823 
827  struct ChallengeNonceP challenge;
828 
833  struct GNUNET_TIME_AbsoluteNBO sender_time;
834 };
835 
836 
842 {
843 
848 
853  struct GNUNET_TIME_RelativeNBO validity_duration;
854 
858  struct ChallengeNonceP challenge;
859 };
860 
861 
867 {
868 
872  struct GNUNET_MessageHeader header;
873 
878 
884 
888  struct ChallengeNonceP challenge;
889 
894  struct GNUNET_TIME_AbsoluteNBO origin_time;
895 
900  struct GNUNET_TIME_RelativeNBO validity_duration;
901 };
902 
903 
913 {
917  struct GNUNET_MessageHeader header;
918 
926  uint32_t seq GNUNET_PACKED;
927 
932  uint64_t inbound_window_size GNUNET_PACKED;
933 
939  uint64_t outbound_sent GNUNET_PACKED;
940 
949  uint64_t outbound_window_size GNUNET_PACKED;
950 
959  struct GNUNET_TIME_AbsoluteNBO sender_time;
960 };
961 
962 
964 
965 
970 {
974  CT_NONE = 0,
975 
979  CT_CORE = 1,
980 
985 
990 
995 };
996 
997 
1003 {
1008 
1013 
1018 
1023 
1030 };
1031 
1032 
1037 {
1038 
1043 
1048 
1052  struct ChallengeNonceP challenge;
1053 
1058  struct GNUNET_TIME_Absolute launch_time;
1059 };
1060 
1061 
1067 {
1071  uint64_t bytes_sent;
1072 
1077  uint64_t bytes_received;
1078 };
1079 
1080 
1085 {
1089  struct GNUNET_TIME_Relative aged_rtt;
1090 
1096 
1101  unsigned int last_age;
1102 };
1103 
1104 
1108 struct TransportClient;
1109 
1113 struct Neighbour;
1114 
1119 struct DistanceVector;
1120 
1125 struct Queue;
1126 
1130 struct PendingMessage;
1131 
1135 struct DistanceVectorHop;
1136 
1145 struct VirtualLink;
1146 
1147 
1153 {
1154 
1160 
1166 
1171 
1175  struct GNUNET_TRANSPORT_IncomingMessage im;
1176 
1181  uint16_t total_hops;
1182 };
1183 
1184 
1189 {
1190 
1195 
1200 
1204  struct VirtualLink *vl;
1205 
1209  uint16_t size;
1210 
1217  uint16_t isize;
1218 };
1219 
1220 
1230 {
1234  struct GNUNET_PeerIdentity target;
1235 
1241 
1247 
1252 
1257 
1262 
1267 
1275 
1281 
1285  struct Neighbour *n;
1286 
1291 
1297  struct GNUNET_TIME_Absolute n_challenge_time;
1298 
1304  struct GNUNET_TIME_Absolute last_fc_transmission;
1305 
1313  struct GNUNET_TIME_Absolute last_fc_timestamp;
1314 
1319  struct GNUNET_TIME_Relative last_fc_rtt;
1320 
1326 
1335 
1343 
1350 
1359 
1372 
1378 
1385 
1396 
1401  uint32_t fc_seq_gen;
1402 
1408  uint32_t last_fc_seq;
1409 
1422 };
1423 
1424 
1429 {
1430 
1436 
1442 
1449 
1456 
1463 
1470 
1477 
1484 
1488  struct AcknowledgementUUIDP ack_uuid;
1489 
1495 
1501 
1506  struct Queue *queue;
1507 
1511  struct GNUNET_TIME_Absolute transmission_time;
1512 
1516  uint16_t message_size;
1517 };
1518 
1519 
1524 {
1525 
1530 
1535 
1540 
1545 
1550 
1555 
1560 
1565 
1571  const struct GNUNET_PeerIdentity *path;
1572 
1578 
1586  struct GNUNET_TIME_Absolute path_valid_until;
1587 
1591  struct PerformanceData pd;
1592 
1598  unsigned int distance;
1599 };
1600 
1601 
1607 {
1608 
1612  struct GNUNET_PeerIdentity target;
1613 
1618 
1623 
1628 
1633  struct VirtualLink *vl;
1634 
1639  struct GNUNET_CRYPTO_EddsaSignature sender_sig;
1640 
1644  struct GNUNET_TIME_Absolute ephemeral_validity;
1645 
1649  struct GNUNET_TIME_Absolute monotime;
1650 
1654  struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
1655 
1659  struct GNUNET_CRYPTO_EcdhePrivateKey private_key;
1660 };
1661 
1662 
1672 struct QueueEntry
1673 {
1674 
1678  struct QueueEntry *next;
1679 
1683  struct QueueEntry *prev;
1684 
1688  struct Queue *queue;
1689 
1694 
1698  uint64_t mid;
1699 };
1700 
1701 
1706 struct Queue
1707 {
1712 
1717 
1722 
1727 
1732 
1737 
1742 
1747 
1752 
1757 
1761  const char *address;
1762 
1768 
1776  struct GNUNET_TIME_Absolute validated_until;
1777 
1781  struct PerformanceData pd;
1782 
1787  uint64_t mid_gen;
1788 
1792  uint32_t qid;
1793 
1797  uint32_t mtu;
1798 
1803 
1808 
1812  unsigned int queue_length;
1813 
1817  enum GNUNET_NetworkType nt;
1818 
1823 
1828  int idle;
1829 };
1830 
1831 
1836 {
1837 
1842  struct MessageUUIDP msg_uuid;
1843 
1848 
1853 
1861  uint8_t *bitfield;
1862 
1866  struct GNUNET_TIME_Absolute reassembly_timeout;
1867 
1872  struct GNUNET_TIME_Absolute last_frag;
1873 
1877  uint16_t msg_size;
1878 
1883  uint16_t msg_missing;
1884 
1885  /* Followed by @e msg_size bytes of the (partially) defragmented original
1886  * message */
1887 
1888  /* Followed by @e bitfield data */
1889 };
1890 
1891 
1895 struct Neighbour
1896 {
1897 
1902 
1909 
1916 
1921 
1927 
1933 
1938 
1943 
1949 
1955 
1960  struct VirtualLink *vl;
1961 
1966  struct GNUNET_TIME_Absolute last_dv_learn_monotime;
1967 
1973 };
1974 
1975 
1981 {
1982 
1987 
1992 
1997 
2002 };
2003 
2004 
2008 struct PeerRequest
2009 {
2010 
2015 
2020 
2025 
2032 
2037 };
2038 
2039 
2044 {
2045 
2050 
2055 
2060 
2065 
2066 };
2067 
2068 
2095 struct PendingMessage
2096 {
2101 
2106 
2111 
2116 
2122 
2128 
2133 
2138 
2144 
2148  struct VirtualLink *vl;
2149 
2158  struct QueueEntry *qe;
2159 
2164 
2169 
2174 
2179 
2184 
2188  struct GNUNET_TIME_Absolute next_attempt;
2189 
2194  struct MessageUUIDP msg_uuid;
2195 
2200  unsigned long long logging_uuid;
2201 
2206 
2212 
2216  uint16_t bytes_msg;
2217 
2221  uint16_t frag_off;
2222 
2226  int16_t msg_uuid_set;
2227 
2228  /* Followed by @e bytes_msg to transmit */
2229 };
2230 
2231 
2236 {
2241  struct GNUNET_TIME_Absolute receive_time;
2242 
2246  struct AcknowledgementUUIDP ack_uuid;
2247 };
2248 
2249 
2255 {
2259  struct GNUNET_PeerIdentity target;
2260 
2265 
2272 
2276  struct GNUNET_TIME_Absolute min_transmission_time;
2277 
2283  uint32_t ack_counter;
2284 
2288  unsigned int num_acks;
2289 };
2290 
2291 
2296 {
2297 
2302 
2307 
2312 
2316  const char *address;
2317 
2322 
2327 
2333 
2337  uint32_t aid;
2338 
2343 };
2344 
2345 
2350 {
2351 
2356 
2361 
2366 
2371 
2376 
2377  union
2378  {
2379 
2383  struct
2384  {
2385 
2391 
2396 
2397  } core;
2398 
2402  struct
2403  {
2404 
2411 
2416 
2417  } monitor;
2418 
2419 
2423  struct
2424  {
2430 
2435 
2440 
2446 
2452 
2458  unsigned int total_queue_length;
2459 
2464 
2465  } communicator;
2466 
2470  struct
2471  {
2472 
2478 
2479  } application;
2480 
2481  } details;
2482 };
2483 
2484 
2490 {
2491 
2497 
2504  struct GNUNET_TIME_Absolute valid_until;
2505 
2510  struct GNUNET_TIME_Absolute validated_until;
2511 
2518  struct GNUNET_TIME_Absolute first_challenge_use;
2519 
2526  struct GNUNET_TIME_Absolute last_challenge_use;
2527 
2535  struct GNUNET_TIME_Absolute next_challenge;
2536 
2545  struct GNUNET_TIME_Relative challenge_backoff;
2546 
2551  struct GNUNET_TIME_Relative validation_rtt;
2552 
2560  struct ChallengeNonceP challenge;
2561 
2565  char *address;
2566 
2572  struct GNUNET_CONTAINER_HeapNode *hn;
2573 
2579 
2585  uint32_t last_window_consum_limit;
2586 
2591  int awaiting_queue;
2592 };
2593 
2594 
2602 {
2607 
2611  struct GNUNET_TIME_Absolute monotonic_time;
2612 
2617 
2621  struct GNUNET_CRYPTO_EcdhePublicKey last_ephemeral;
2622 
2628 
2633 
2639 
2645 
2650  size_t body_size;
2651 };
2652 
2653 
2658 
2663 
2668 
2673 
2678 
2683 
2689 
2695 
2701 
2707 
2713 
2719 
2725 
2730 
2735 
2740 
2747 
2752 
2757 
2762 
2768 
2774 
2780 static struct IncomingRequest *ir_head;
2781 
2785 static struct IncomingRequest *ir_tail;
2786 
2790 static unsigned int ir_total;
2791 
2795 static unsigned long long logging_uuid_gen;
2796 
2801 static unsigned int pa_count;
2802 
2812 
2813 
2824 static unsigned int
2826 {
2827  struct GNUNET_TIME_Absolute now;
2828 
2829  now = GNUNET_TIME_absolute_get ();
2830  return now.abs_value_us / GNUNET_TIME_UNIT_MINUTES.rel_value_us / 15;
2831 }
2832 
2833 
2839 static void
2841 {
2842  GNUNET_CONTAINER_DLL_remove (ir_head, ir_tail, ir);
2843  GNUNET_assert (ir_total > 0);
2844  ir_total--;
2846  GNUNET_free (ir);
2847 }
2848 
2849 
2855 static void
2857 {
2858  struct Queue *q = pa->queue;
2859  struct PendingMessage *pm = pa->pm;
2860  struct DistanceVectorHop *dvh = pa->dvh;
2861 
2862  GNUNET_CONTAINER_MDLL_remove (pa, pa_head, pa_tail, pa);
2863  pa_count--;
2864  if (NULL != q)
2865  {
2867  pa->queue = NULL;
2868  }
2869  if (NULL != pm)
2870  {
2871  GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
2872  pa->pm = NULL;
2873  }
2874  if (NULL != dvh)
2875  {
2876  GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
2877  pa->queue = NULL;
2878  }
2881  &pa->ack_uuid.value,
2882  pa));
2883  GNUNET_free (pa);
2884 }
2885 
2886 
2895 static void
2897 {
2898  struct PendingMessage *frag;
2899 
2900  while (NULL != (frag = root->head_frag))
2901  {
2902  struct PendingAcknowledgement *pa;
2903 
2904  free_fragment_tree (frag);
2905  while (NULL != (pa = frag->pa_head))
2906  {
2907  GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
2908  pa->pm = NULL;
2909  }
2910  GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
2911  GNUNET_free (frag);
2912  }
2913 }
2914 
2915 
2923 static void
2925 {
2926  struct TransportClient *tc = pm->client;
2927  struct VirtualLink *vl = pm->vl;
2928  struct PendingAcknowledgement *pa;
2929 
2930  if (NULL != tc)
2931  {
2933  tc->details.core.pending_msg_head,
2934  tc->details.core.pending_msg_tail,
2935  pm);
2936  }
2937  if (NULL != vl)
2938  {
2940  vl->pending_msg_head,
2941  vl->pending_msg_tail,
2942  pm);
2943  }
2944  while (NULL != (pa = pm->pa_head))
2945  {
2946  GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
2947  pa->pm = NULL;
2948  }
2949 
2950  free_fragment_tree (pm);
2951  if (NULL != pm->qe)
2952  {
2953  GNUNET_assert (pm == pm->qe->pm);
2954  pm->qe->pm = NULL;
2955  }
2956  if (NULL != pm->bpm)
2957  {
2958  free_fragment_tree (pm->bpm);
2959  GNUNET_free (pm->bpm);
2960  }
2961  GNUNET_free (pm);
2962 }
2963 
2964 
2970 static void
2972 {
2973  struct PendingMessage *pm;
2974  struct CoreSentContext *csc;
2975 
2976  while (NULL != (pm = vl->pending_msg_head))
2977  free_pending_message (pm);
2979  GNUNET_CONTAINER_multipeermap_remove (links, &vl->target, vl));
2980  if (NULL != vl->visibility_task)
2981  {
2983  vl->visibility_task = NULL;
2984  }
2985  if (NULL != vl->fc_retransmit_task)
2986  {
2988  vl->fc_retransmit_task = NULL;
2989  }
2990  while (NULL != (csc = vl->csc_head))
2991  {
2993  GNUNET_assert (vl == csc->vl);
2994  csc->vl = NULL;
2995  }
2996  GNUNET_break (NULL == vl->n);
2997  GNUNET_break (NULL == vl->dv);
2998  GNUNET_free (vl);
2999 }
3000 
3001 
3007 static void
3009 {
3010  GNUNET_assert (
3011  GNUNET_YES ==
3012  GNUNET_CONTAINER_multipeermap_remove (validation_map, &vs->pid, vs));
3014  vs->hn = NULL;
3015  if (NULL != vs->sc)
3016  {
3018  vs->sc = NULL;
3019  }
3020  GNUNET_free (vs->address);
3021  GNUNET_free (vs);
3022 }
3023 
3024 
3031 static struct Neighbour *
3033 {
3034  return GNUNET_CONTAINER_multipeermap_get (neighbours, pid);
3035 }
3036 
3037 
3044 static struct VirtualLink *
3046 {
3047  return GNUNET_CONTAINER_multipeermap_get (links, pid);
3048 }
3049 
3050 
3055 {
3059  struct GNUNET_TIME_Absolute last_validation;
3060  struct GNUNET_TIME_Absolute valid_until;
3061  struct GNUNET_TIME_Absolute next_validation;
3062 
3067 
3072 
3077 
3082 };
3083 
3084 
3093 static void
3095 {
3096  struct Neighbour *n = dvh->next_hop;
3097  struct DistanceVector *dv = dvh->dv;
3098  struct PendingAcknowledgement *pa;
3099 
3100  while (NULL != (pa = dvh->pa_head))
3101  {
3102  GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
3103  pa->dvh = NULL;
3104  }
3105  GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh);
3106  GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, dvh);
3107  GNUNET_free (dvh);
3108 }
3109 
3110 
3117 static void
3118 check_link_down (void *cls);
3119 
3120 
3126 static void
3128 {
3130  "Informing CORE clients about disconnect from %s\n",
3131  GNUNET_i2s (pid));
3132  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3133  {
3134  struct GNUNET_MQ_Envelope *env;
3135  struct DisconnectInfoMessage *dim;
3136 
3137  if (CT_CORE != tc->type)
3138  continue;
3140  dim->peer = *pid;
3141  GNUNET_MQ_send (tc->mq, env);
3142  }
3143 }
3144 
3145 
3152 static void
3154 {
3155  struct DistanceVectorHop *dvh;
3156 
3157  while (NULL != (dvh = dv->dv_head))
3159  if (NULL == dv->dv_head)
3160  {
3161  struct VirtualLink *vl;
3162 
3163  GNUNET_assert (
3164  GNUNET_YES ==
3165  GNUNET_CONTAINER_multipeermap_remove (dv_routes, &dv->target, dv));
3166  if (NULL != (vl = dv->vl))
3167  {
3168  GNUNET_assert (dv == vl->dv);
3169  vl->dv = NULL;
3170  if (NULL == vl->n)
3171  {
3173  free_virtual_link (vl);
3174  }
3175  else
3176  {
3179  }
3180  dv->vl = NULL;
3181  }
3182 
3183  if (NULL != dv->timeout_task)
3184  {
3186  dv->timeout_task = NULL;
3187  }
3188  GNUNET_free (dv);
3189  }
3190 }
3191 
3192 
3206 static void
3208  const struct GNUNET_PeerIdentity *peer,
3209  const char *address,
3210  enum GNUNET_NetworkType nt,
3211  const struct MonitorEvent *me)
3212 {
3213  struct GNUNET_MQ_Envelope *env;
3214  struct GNUNET_TRANSPORT_MonitorData *md;
3215  size_t addr_len = strlen (address) + 1;
3216 
3217  env = GNUNET_MQ_msg_extra (md,
3218  addr_len,
3220  md->nt = htonl ((uint32_t) nt);
3221  md->peer = *peer;
3222  md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
3223  md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
3224  md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
3225  md->rtt = GNUNET_TIME_relative_hton (me->rtt);
3226  md->cs = htonl ((uint32_t) me->cs);
3227  md->num_msg_pending = htonl (me->num_msg_pending);
3228  md->num_bytes_pending = htonl (me->num_bytes_pending);
3229  memcpy (&md[1], address, addr_len);
3230  GNUNET_MQ_send (tc->mq, env);
3231 }
3232 
3233 
3243 static void
3245  const char *address,
3246  enum GNUNET_NetworkType nt,
3247  const struct MonitorEvent *me)
3248 {
3249  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3250  {
3251  if (CT_MONITOR != tc->type)
3252  continue;
3253  if (tc->details.monitor.one_shot)
3254  continue;
3255  if ((0 != GNUNET_is_zero (&tc->details.monitor.peer)) &&
3256  (0 != GNUNET_memcmp (&tc->details.monitor.peer, peer)))
3257  continue;
3258  notify_monitor (tc, peer, address, nt, me);
3259  }
3260 }
3261 
3262 
3272 static void *
3274  struct GNUNET_SERVICE_Client *client,
3275  struct GNUNET_MQ_Handle *mq)
3276 {
3277  struct TransportClient *tc;
3278 
3279  (void) cls;
3280  tc = GNUNET_new (struct TransportClient);
3281  tc->client = client;
3282  tc->mq = mq;
3283  GNUNET_CONTAINER_DLL_insert (clients_head, clients_tail, tc);
3284  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p connected\n", tc);
3285  return tc;
3286 }
3287 
3288 
3294 static void
3296 {
3297  struct Neighbour *n = rc->neighbour;
3298 
3302  rc->msg_uuid.uuid,
3303  rc));
3304  GNUNET_free (rc);
3305 }
3306 
3307 
3313 static void
3315 {
3316  struct Neighbour *n = cls;
3317  struct ReassemblyContext *rc;
3318 
3319  n->reassembly_timeout_task = NULL;
3320  while (NULL != (rc = GNUNET_CONTAINER_heap_peek (n->reassembly_heap)))
3321  {
3323  .rel_value_us)
3324  {
3326  continue;
3327  }
3332  n);
3333  return;
3334  }
3335 }
3336 
3337 
3346 static int
3347 free_reassembly_cb (void *cls, uint32_t key, void *value)
3348 {
3349  struct ReassemblyContext *rc = value;
3350 
3351  (void) cls;
3352  (void) key;
3354  return GNUNET_OK;
3355 }
3356 
3357 
3363 static void
3365 {
3366  struct DistanceVectorHop *dvh;
3367  struct VirtualLink *vl;
3368 
3369  GNUNET_assert (NULL == neighbour->queue_head);
3372  &neighbour->pid,
3373  neighbour));
3374  if (NULL != neighbour->reassembly_map)
3375  {
3378  NULL);
3380  neighbour->reassembly_map = NULL;
3382  neighbour->reassembly_heap = NULL;
3383  }
3384  while (NULL != (dvh = neighbour->dv_head))
3385  {
3386  struct DistanceVector *dv = dvh->dv;
3387 
3389  if (NULL == dv->dv_head)
3390  free_dv_route (dv);
3391  }
3392  if (NULL != neighbour->reassembly_timeout_task)
3393  {
3395  neighbour->reassembly_timeout_task = NULL;
3396  }
3397  if (NULL != neighbour->get)
3398  {
3399  GNUNET_PEERSTORE_iterate_cancel (neighbour->get);
3400  neighbour->get = NULL;
3401  }
3402  if (NULL != neighbour->sc)
3403  {
3404  GNUNET_PEERSTORE_store_cancel (neighbour->sc);
3405  neighbour->sc = NULL;
3406  }
3407  if (NULL != (vl = neighbour->vl))
3408  {
3409  GNUNET_assert (neighbour == vl->n);
3410  vl->n = NULL;
3411  if (NULL == vl->dv)
3412  {
3414  free_virtual_link (vl);
3415  }
3416  else
3417  {
3420  }
3421  neighbour->vl = NULL;
3422  }
3423  GNUNET_free (neighbour);
3424 }
3425 
3426 
3433 static void
3435  const struct GNUNET_PeerIdentity *pid)
3436 {
3437  struct GNUNET_MQ_Envelope *env;
3438  struct ConnectInfoMessage *cim;
3439 
3440  GNUNET_assert (CT_CORE == tc->type);
3442  cim->id = *pid;
3443  GNUNET_MQ_send (tc->mq, env);
3444 }
3445 
3446 
3452 static void
3454 {
3456  "Informing CORE clients about connection to %s\n",
3457  GNUNET_i2s (pid));
3458  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3459  {
3460  if (CT_CORE != tc->type)
3461  continue;
3462  core_send_connect_info (tc, pid);
3463  }
3464 }
3465 
3466 
3474 static void
3475 transmit_on_queue (void *cls);
3476 
3477 
3485 static void
3488 {
3489  if (queue->tc->details.communicator.total_queue_length >=
3491  {
3493  GST_stats,
3494  "# Transmission throttled due to communicator queue limit",
3495  1,
3496  GNUNET_NO);
3497  queue->idle = GNUNET_NO;
3498  return;
3499  }
3500  if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
3501  {
3502  GNUNET_STATISTICS_update (GST_stats,
3503  "# Transmission throttled due to queue queue limit",
3504  1,
3505  GNUNET_NO);
3506  queue->idle = GNUNET_NO;
3507  return;
3508  }
3509  /* queue might indeed be ready, schedule it */
3510  if (NULL != queue->transmit_task)
3512  queue->transmit_task =
3515  "Considering transmission on queue `%s' to %s\n",
3516  queue->address,
3517  GNUNET_i2s (&queue->neighbour->pid));
3518 }
3519 
3520 
3527 static void
3528 check_link_down (void *cls)
3529 {
3530  struct VirtualLink *vl = cls;
3531  struct DistanceVector *dv = vl->dv;
3532  struct Neighbour *n = vl->n;
3533  struct GNUNET_TIME_Absolute dvh_timeout;
3534  struct GNUNET_TIME_Absolute q_timeout;
3535 
3536  vl->visibility_task = NULL;
3537  dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3538  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
3539  pos = pos->next_dv)
3540  dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout, pos->path_valid_until);
3541  if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
3542  {
3543  vl->dv->vl = NULL;
3544  vl->dv = NULL;
3545  }
3546  q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3547  for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
3548  q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
3549  if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
3550  {
3551  vl->n->vl = NULL;
3552  vl->n = NULL;
3553  }
3554  if ((NULL == vl->n) && (NULL == vl->dv))
3555  {
3557  free_virtual_link (vl);
3558  return;
3559  }
3560  vl->visibility_task =
3561  GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout),
3562  &check_link_down,
3563  vl);
3564 }
3565 
3566 
3572 static void
3574 {
3575  struct Neighbour *neighbour = queue->neighbour;
3576  struct TransportClient *tc = queue->tc;
3577  struct MonitorEvent me = {.cs = GNUNET_TRANSPORT_CS_DOWN,
3579  struct QueueEntry *qe;
3580  int maxxed;
3581  struct PendingAcknowledgement *pa;
3582  struct VirtualLink *vl;
3583 
3584  if (NULL != queue->transmit_task)
3585  {
3587  queue->transmit_task = NULL;
3588  }
3589  while (NULL != (pa = queue->pa_head))
3590  {
3591  GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
3592  pa->queue = NULL;
3593  }
3594 
3595  GNUNET_CONTAINER_MDLL_remove (neighbour,
3596  neighbour->queue_head,
3597  neighbour->queue_tail,
3598  queue);
3600  tc->details.communicator.queue_head,
3601  tc->details.communicator.queue_tail,
3602  queue);
3603  maxxed = (COMMUNICATOR_TOTAL_QUEUE_LIMIT >=
3604  tc->details.communicator.total_queue_length);
3605  while (NULL != (qe = queue->queue_head))
3606  {
3607  GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe);
3608  queue->queue_length--;
3609  tc->details.communicator.total_queue_length--;
3610  if (NULL != qe->pm)
3611  {
3612  GNUNET_assert (qe == qe->pm->qe);
3613  qe->pm->qe = NULL;
3614  }
3615  GNUNET_free (qe);
3616  }
3617  GNUNET_assert (0 == queue->queue_length);
3618  if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT <
3619  tc->details.communicator.total_queue_length))
3620  {
3621  /* Communicator dropped below threshold, resume all _other_ queues */
3623  GST_stats,
3624  "# Transmission throttled due to communicator queue limit",
3625  -1,
3626  GNUNET_NO);
3627  for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
3628  s = s->next_client)
3630  }
3631  notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
3632  GNUNET_free (queue);
3633 
3634  vl = lookup_virtual_link (&neighbour->pid);
3635  if ((NULL != vl) && (neighbour == vl->n))
3636  {
3638  check_link_down (vl);
3639  }
3640  if (NULL == neighbour->queue_head)
3641  {
3642  free_neighbour (neighbour);
3643  }
3644 }
3645 
3646 
3652 static void
3654 {
3655  struct TransportClient *tc = ale->tc;
3656 
3658  tc->details.communicator.addr_tail,
3659  ale);
3660  if (NULL != ale->sc)
3661  {
3663  ale->sc = NULL;
3664  }
3665  if (NULL != ale->st)
3666  {
3667  GNUNET_SCHEDULER_cancel (ale->st);
3668  ale->st = NULL;
3669  }
3670  GNUNET_free (ale);
3671 }
3672 
3673 
3682 static int
3684  const struct GNUNET_PeerIdentity *pid,
3685  void *value)
3686 {
3687  struct TransportClient *tc = cls;
3688  struct PeerRequest *pr = value;
3689 
3691  GNUNET_assert (
3692  GNUNET_YES ==
3694  pid,
3695  pr));
3696  GNUNET_free (pr);
3697 
3698  return GNUNET_OK;
3699 }
3700 
3701 
3710 static void
3712  struct GNUNET_SERVICE_Client *client,
3713  void *app_ctx)
3714 {
3715  struct TransportClient *tc = app_ctx;
3716 
3717  (void) cls;
3718  (void) client;
3720  "Client %p disconnected, cleaning up.\n",
3721  tc);
3722  GNUNET_CONTAINER_DLL_remove (clients_head, clients_tail, tc);
3723  switch (tc->type)
3724  {
3725  case CT_NONE:
3726  break;
3727  case CT_CORE: {
3728  struct PendingMessage *pm;
3729 
3730  while (NULL != (pm = tc->details.core.pending_msg_head))
3731  {
3733  tc->details.core.pending_msg_head,
3734  tc->details.core.pending_msg_tail,
3735  pm);
3736  pm->client = NULL;
3737  }
3738  }
3739  break;
3740  case CT_MONITOR:
3741  break;
3742  case CT_COMMUNICATOR: {
3743  struct Queue *q;
3744  struct AddressListEntry *ale;
3745 
3746  while (NULL != (q = tc->details.communicator.queue_head))
3747  free_queue (q);
3748  while (NULL != (ale = tc->details.communicator.addr_head))
3750  GNUNET_free (tc->details.communicator.address_prefix);
3751  }
3752  break;
3753  case CT_APPLICATION:
3756  tc);
3758  break;
3759  }
3760  GNUNET_free (tc);
3761 }
3762 
3763 
3773 static int
3775  const struct GNUNET_PeerIdentity *pid,
3776  void *value)
3777 {
3778  struct TransportClient *tc = cls;
3779 
3780  (void) value;
3782  "Telling new CORE client about existing connection to %s\n",
3783  GNUNET_i2s (pid));
3784  core_send_connect_info (tc, pid);
3785  return GNUNET_OK;
3786 }
3787 
3788 
3797 static void
3798 handle_client_start (void *cls, const struct StartMessage *start)
3799 {
3800  struct TransportClient *tc = cls;
3801  uint32_t options;
3802 
3803  options = ntohl (start->options);
3804  if ((0 != (1 & options)) &&
3805  (0 != GNUNET_memcmp (&start->self, &GST_my_identity)))
3806  {
3807  /* client thinks this is a different peer, reject */
3808  GNUNET_break (0);
3810  return;
3811  }
3812  if (CT_NONE != tc->type)
3813  {
3814  GNUNET_break (0);
3816  return;
3817  }
3818  tc->type = CT_CORE;
3820  "New CORE client with PID %s registered\n",
3821  GNUNET_i2s (&start->self));
3824  tc);
3826 }
3827 
3828 
3835 static int
3836 check_client_send (void *cls, const struct OutboundMessage *obm)
3837 {
3838  struct TransportClient *tc = cls;
3839  uint16_t size;
3840  const struct GNUNET_MessageHeader *obmm;
3841 
3842  if (CT_CORE != tc->type)
3843  {
3844  GNUNET_break (0);
3845  return GNUNET_SYSERR;
3846  }
3847  size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
3848  if (size < sizeof (struct GNUNET_MessageHeader))
3849  {
3850  GNUNET_break (0);
3851  return GNUNET_SYSERR;
3852  }
3853  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
3854  if (size != ntohs (obmm->size))
3855  {
3856  GNUNET_break (0);
3857  return GNUNET_SYSERR;
3858  }
3859  return GNUNET_OK;
3860 }
3861 
3862 
3870 static void
3872 {
3873  struct TransportClient *tc = pm->client;
3874  struct VirtualLink *vl = pm->vl;
3875 
3876  if (NULL != tc)
3877  {
3878  struct GNUNET_MQ_Envelope *env;
3879  struct SendOkMessage *som;
3880 
3882  som->peer = vl->target;
3884  "Confirming transmission of <%llu> to %s\n",
3885  pm->logging_uuid,
3886  GNUNET_i2s (&vl->target));
3887  GNUNET_MQ_send (tc->mq, env);
3888  }
3889  free_pending_message (pm);
3890 }
3891 
3892 
3902 static unsigned int
3905  struct DistanceVectorHop **hops_array,
3906  unsigned int hops_array_length)
3907 {
3908  uint64_t choices[hops_array_length];
3909  uint64_t num_dv;
3910  unsigned int dv_count;
3911 
3912  /* Pick random vectors, but weighted by distance, giving more weight
3913  to shorter vectors */
3914  num_dv = 0;
3915  dv_count = 0;
3916  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
3917  pos = pos->next_dv)
3918  {
3919  if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
3920  (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
3921  .rel_value_us == 0))
3922  continue; /* pos unconfirmed and confirmed required */
3923  num_dv += MAX_DV_HOPS_ALLOWED - pos->distance;
3924  dv_count++;
3925  }
3926  if (0 == dv_count)
3927  return 0;
3928  if (dv_count <= hops_array_length)
3929  {
3930  dv_count = 0;
3931  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
3932  pos = pos->next_dv)
3933  hops_array[dv_count++] = pos;
3934  return dv_count;
3935  }
3936  for (unsigned int i = 0; i < hops_array_length; i++)
3937  {
3938  int ok = GNUNET_NO;
3939  while (GNUNET_NO == ok)
3940  {
3941  choices[i] =
3943  ok = GNUNET_YES;
3944  for (unsigned int j = 0; j < i; j++)
3945  if (choices[i] == choices[j])
3946  {
3947  ok = GNUNET_NO;
3948  break;
3949  }
3950  }
3951  }
3952  dv_count = 0;
3953  num_dv = 0;
3954  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
3955  pos = pos->next_dv)
3956  {
3957  uint32_t delta = MAX_DV_HOPS_ALLOWED - pos->distance;
3958 
3959  if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
3960  (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
3961  .rel_value_us == 0))
3962  continue; /* pos unconfirmed and confirmed required */
3963  for (unsigned int i = 0; i < hops_array_length; i++)
3964  if ((num_dv <= choices[i]) && (num_dv + delta > choices[i]))
3965  hops_array[dv_count++] = pos;
3966  num_dv += delta;
3967  }
3968  return dv_count;
3969 }
3970 
3971 
3978 static int
3980  void *cls,
3981  const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
3982 {
3983  struct TransportClient *tc = cls;
3984  uint16_t size;
3985 
3986  if (CT_NONE != tc->type)
3987  {
3988  GNUNET_break (0);
3989  return GNUNET_SYSERR;
3990  }
3991  tc->type = CT_COMMUNICATOR;
3992  size = ntohs (cam->header.size) - sizeof (*cam);
3993  if (0 == size)
3994  return GNUNET_OK; /* receive-only communicator */
3996  return GNUNET_OK;
3997 }
3998 
3999 
4005 static void
4007 {
4008  if (0 != ntohl (cmc->im.fc_on))
4009  {
4010  /* send ACK when done to communicator for flow control! */
4011  struct GNUNET_MQ_Envelope *env;
4012  struct GNUNET_TRANSPORT_IncomingMessageAck *ack;
4013 
4015  ack->reserved = htonl (0);
4016  ack->fc_id = cmc->im.fc_id;
4017  ack->sender = cmc->im.sender;
4018  GNUNET_MQ_send (cmc->tc->mq, env);
4019  }
4021  GNUNET_free (cmc);
4022 }
4023 
4024 
4034 static void
4035 handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
4036 {
4037  struct TransportClient *tc = cls;
4038  struct VirtualLink *vl;
4039  uint32_t delta;
4040  struct CommunicatorMessageContext *cmc;
4041 
4042  if (CT_CORE != tc->type)
4043  {
4044  GNUNET_break (0);
4046  return;
4047  }
4048  vl = lookup_virtual_link (&rom->peer);
4049  if (NULL == vl)
4050  {
4051  GNUNET_STATISTICS_update (GST_stats,
4052  "# RECV_OK dropped: virtual link unknown",
4053  1,
4054  GNUNET_NO);
4056  return;
4057  }
4058  delta = ntohl (rom->increase_window_delta);
4059  vl->core_recv_window += delta;
4060  if (vl->core_recv_window <= 0)
4061  return;
4062  /* resume communicators */
4063  while (NULL != (cmc = vl->cmc_tail))
4064  {
4066  finish_cmc_handling (cmc);
4067  }
4068 }
4069 
4070 
4077 static void
4079  void *cls,
4080  const struct GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
4081 {
4082  struct TransportClient *tc = cls;
4083  uint16_t size;
4084 
4085  size = ntohs (cam->header.size) - sizeof (*cam);
4086  if (0 == size)
4087  {
4089  "Receive-only communicator connected\n");
4090  return; /* receive-only communicator */
4091  }
4092  tc->details.communicator.address_prefix =
4093  GNUNET_strdup ((const char *) &cam[1]);
4094  tc->details.communicator.cc =
4095  (enum GNUNET_TRANSPORT_CommunicatorCharacteristics) ntohl (cam->cc);
4097  "Communicator with prefix `%s' connected\n",
4098  tc->details.communicator.address_prefix);
4100 }
4101 
4102 
4110 static int
4112  void *cls,
4113  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
4114 {
4115  const struct GNUNET_MessageHeader *inbox;
4116  const char *is;
4117  uint16_t msize;
4118  uint16_t isize;
4119 
4120  (void) cls;
4121  msize = ntohs (cb->header.size) - sizeof (*cb);
4122  inbox = (const struct GNUNET_MessageHeader *) &cb[1];
4123  isize = ntohs (inbox->size);
4124  if (isize >= msize)
4125  {
4126  GNUNET_break (0);
4127  return GNUNET_SYSERR;
4128  }
4129  is = (const char *) inbox;
4130  is += isize;
4131  msize -= isize;
4132  GNUNET_assert (0 < msize);
4133  if ('\0' != is[msize - 1])
4134  {
4135  GNUNET_break (0);
4136  return GNUNET_SYSERR;
4137  }
4138  return GNUNET_OK;
4139 }
4140 
4141 
4148 static void
4150 {
4151  struct EphemeralConfirmationPS ec;
4152 
4153  if (0 !=
4155  return;
4157  dv->ephemeral_validity =
4163  ec.purpose.size = htonl (sizeof (ec));
4164  ec.target = dv->target;
4165  ec.ephemeral_key = dv->ephemeral_key;
4166  GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
4167  &ec.purpose,
4168  &dv->sender_sig));
4169 }
4170 
4171 
4181 static void
4183  struct PendingMessage *pm,
4184  const void *payload,
4185  size_t payload_size)
4186 {
4187  struct Neighbour *n = queue->neighbour;
4188  struct GNUNET_TRANSPORT_SendMessageTo *smt;
4189  struct GNUNET_MQ_Envelope *env;
4190 
4191  queue->idle = GNUNET_NO;
4192  GNUNET_log (
4194  "Queueing %u bytes of payload for transmission <%llu> on queue %llu to %s\n",
4195  (unsigned int) payload_size,
4196  (NULL == pm) ? 0 : pm->logging_uuid,
4197  (unsigned long long) queue->qid,
4198  GNUNET_i2s (&queue->neighbour->pid));
4199  env = GNUNET_MQ_msg_extra (smt,
4200  payload_size,
4202  smt->qid = queue->qid;
4203  smt->mid = queue->mid_gen;
4204  smt->receiver = n->pid;
4205  memcpy (&smt[1], payload, payload_size);
4206  {
4207  /* Pass the env to the communicator of queue for transmission. */
4208  struct QueueEntry *qe;
4209 
4210  qe = GNUNET_new (struct QueueEntry);
4211  qe->mid = queue->mid_gen++;
4212  qe->queue = queue;
4213  if (NULL != pm)
4214  {
4215  qe->pm = pm;
4216  GNUNET_assert (NULL == pm->qe);
4217  pm->qe = qe;
4218  }
4219  GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe);
4220  GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
4221  queue->queue_length++;
4222  queue->tc->details.communicator.total_queue_length++;
4224  queue->tc->details.communicator.total_queue_length)
4225  queue->idle = GNUNET_NO;
4226  if (QUEUE_LENGTH_LIMIT == queue->queue_length)
4227  queue->idle = GNUNET_NO;
4228  GNUNET_MQ_send (queue->tc->mq, env);
4229  }
4230 }
4231 
4232 
4243 static struct GNUNET_TIME_Relative
4245  const struct GNUNET_MessageHeader *hdr,
4247 {
4248  struct GNUNET_TIME_Absolute now;
4249  unsigned int candidates;
4250  unsigned int sel1;
4251  unsigned int sel2;
4252  struct GNUNET_TIME_Relative rtt;
4253 
4254  /* Pick one or two 'random' queues from n (under constraints of options) */
4255  now = GNUNET_TIME_absolute_get ();
4256  /* FIXME-OPTIMIZE: give queues 'weights' and pick proportional to
4257  weight in the future; weight could be assigned by observed
4258  bandwidth (note: not sure if we should do this for this type
4259  of control traffic though). */
4260  candidates = 0;
4261  for (struct Queue *pos = n->queue_head; NULL != pos;
4262  pos = pos->next_neighbour)
4263  {
4264  if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) ||
4265  (pos->validated_until.abs_value_us > now.abs_value_us))
4266  candidates++;
4267  }
4268  if (0 == candidates)
4269  {
4270  /* This can happen rarely if the last confirmed queue timed
4271  out just as we were beginning to process this message. */
4273  "Could not route message of type %u to %s: no valid queue\n",
4274  ntohs (hdr->type),
4275  GNUNET_i2s (&n->pid));
4276  GNUNET_STATISTICS_update (GST_stats,
4277  "# route selection failed (all no valid queue)",
4278  1,
4279  GNUNET_NO);
4281  }
4282 
4285  if (0 == (options & RMO_REDUNDANT))
4286  sel2 = candidates; /* picks none! */
4287  else
4289  candidates = 0;
4290  for (struct Queue *pos = n->queue_head; NULL != pos;
4291  pos = pos->next_neighbour)
4292  {
4293  if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) ||
4294  (pos->validated_until.abs_value_us > now.abs_value_us))
4295  {
4296  if ((sel1 == candidates) || (sel2 == candidates))
4297  {
4299  "Routing message of type %u to %s using %s (#%u)\n",
4300  ntohs (hdr->type),
4301  GNUNET_i2s (&n->pid),
4302  pos->address,
4303  (sel1 == candidates) ? 1 : 2);
4304  rtt = GNUNET_TIME_relative_min (rtt, pos->pd.aged_rtt);
4305  queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
4306  }
4307  candidates++;
4308  }
4309  }
4310  return rtt;
4311 }
4312 
4313 
4318 {
4322  gcry_cipher_hd_t cipher;
4323 
4327  struct
4328  {
4329 
4333  struct GNUNET_CRYPTO_AuthKey hmac_key;
4334 
4338  char aes_key[256 / 8];
4339 
4343  char aes_ctr[128 / 8];
4344 
4345  } material;
4346 };
4347 
4348 
4357 static void
4359  const struct GNUNET_ShortHashCode *iv,
4360  struct DVKeyState *key)
4361 {
4362  /* must match #dh_key_derive_eph_pub */
4364  GNUNET_CRYPTO_kdf (&key->material,
4365  sizeof (key->material),
4366  "transport-backchannel-key",
4367  strlen ("transport-backchannel-key"),
4368  &km,
4369  sizeof (km),
4370  iv,
4371  sizeof (*iv)));
4373  "Deriving backchannel key based on KM %s and IV %s\n",
4374  GNUNET_h2s (km),
4375  GNUNET_sh2s (iv));
4376  gcry_cipher_open (&key->cipher,
4377  GCRY_CIPHER_AES256 /* low level: go for speed */,
4378  GCRY_CIPHER_MODE_CTR,
4379  0 /* flags */);
4380  gcry_cipher_setkey (key->cipher,
4381  &key->material.aes_key,
4382  sizeof (key->material.aes_key));
4383  gcry_cipher_setctr (key->cipher,
4384  &key->material.aes_ctr,
4385  sizeof (key->material.aes_ctr));
4386 }
4387 
4388 
4398 static void
4400  const struct GNUNET_CRYPTO_EcdhePrivateKey *priv_ephemeral,
4401  const struct GNUNET_PeerIdentity *target,
4402  const struct GNUNET_ShortHashCode *iv,
4403  struct DVKeyState *key)
4404 {
4405  struct GNUNET_HashCode km;
4406 
4408  &target->public_key,
4409  &km));
4410  dv_setup_key_state_from_km (&km, iv, key);
4411 }
4412 
4413 
4423 static void
4425  const struct GNUNET_ShortHashCode *iv,
4426  struct DVKeyState *key)
4427 {
4428  struct GNUNET_HashCode km;
4429 
4430  GNUNET_assert (GNUNET_YES == GNUNET_CRYPTO_eddsa_ecdh (GST_my_private_key,
4431  pub_ephemeral,
4432  &km));
4433  dv_setup_key_state_from_km (&km, iv, key);
4434 }
4435 
4436 
4446 static void
4447 dv_hmac (const struct DVKeyState *key,
4448  struct GNUNET_HashCode *hmac,
4449  const void *data,
4450  size_t data_size)
4451 {
4452  GNUNET_CRYPTO_hmac (&key->material.hmac_key, data, data_size, hmac);
4453 }
4454 
4455 
4465 static void
4466 dv_encrypt (struct DVKeyState *key, const void *in, void *dst, size_t in_size)
4467 {
4468  GNUNET_assert (0 ==
4469  gcry_cipher_encrypt (key->cipher, dst, in_size, in, in_size));
4470 }
4471 
4472 
4482 static void
4484  void *out,
4485  const void *ciph,
4486  size_t out_size)
4487 {
4488  GNUNET_assert (
4489  0 == gcry_cipher_decrypt (key->cipher, out, out_size, ciph, out_size));
4490 }
4491 
4492 
4498 static void
4500 {
4501  gcry_cipher_close (key->cipher);
4502  GNUNET_CRYPTO_zero_keys (&key->material, sizeof (key->material));
4503 }
4504 
4505 
4516 typedef void (*DVMessageHandler) (void *cls,
4517  struct Neighbour *next_hop,
4518  const struct GNUNET_MessageHeader *hdr,
4520 
4534 static struct GNUNET_TIME_Relative
4536  unsigned int num_dvhs,
4537  struct DistanceVectorHop **dvhs,
4538  const struct GNUNET_MessageHeader *hdr,
4539  DVMessageHandler use,
4540  void *use_cls,
4542 {
4543  struct TransportDVBoxMessage box_hdr;
4544  struct TransportDVBoxPayloadP payload_hdr;
4545  uint16_t enc_body_size = ntohs (hdr->size);
4546  char enc[sizeof (struct TransportDVBoxPayloadP) + enc_body_size] GNUNET_ALIGN;
4547  struct TransportDVBoxPayloadP *enc_payload_hdr =
4548  (struct TransportDVBoxPayloadP *) enc;
4549  struct DVKeyState key;
4550  struct GNUNET_TIME_Relative rtt;
4551 
4552  /* Encrypt payload */
4554  box_hdr.total_hops = htons (0);
4555  update_ephemeral (dv);
4556  box_hdr.ephemeral_key = dv->ephemeral_key;
4557  payload_hdr.sender_sig = dv->sender_sig;
4559  &box_hdr.iv,
4560  sizeof (box_hdr.iv));
4561  dh_key_derive_eph_pid (&dv->private_key, &dv->target, &box_hdr.iv, &key);
4562  payload_hdr.sender = GST_my_identity;
4563  payload_hdr.monotonic_time = GNUNET_TIME_absolute_hton (dv->monotime);
4564  dv_encrypt (&key, &payload_hdr, enc_payload_hdr, sizeof (payload_hdr));
4565  dv_encrypt (&key,
4566  hdr,
4567  &enc[sizeof (struct TransportDVBoxPayloadP)],
4568  enc_body_size);
4569  dv_hmac (&key, &box_hdr.hmac, enc, sizeof (enc));
4570  dv_key_clean (&key);
4572  /* For each selected path, take the pre-computed header and body
4573  and add the path in the middle of the message; then send it. */
4574  for (unsigned int i = 0; i < num_dvhs; i++)
4575  {
4576  struct DistanceVectorHop *dvh = dvhs[i];
4577  unsigned int num_hops = dvh->distance + 1;
4578  char buf[sizeof (struct TransportDVBoxMessage) +
4579  sizeof (struct GNUNET_PeerIdentity) * num_hops +
4580  sizeof (struct TransportDVBoxPayloadP) +
4581  enc_body_size] GNUNET_ALIGN;
4582  struct GNUNET_PeerIdentity *dhops;
4583 
4584  box_hdr.header.size = htons (sizeof (buf));
4585  box_hdr.num_hops = htons (num_hops);
4586  memcpy (buf, &box_hdr, sizeof (box_hdr));
4587  dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof (box_hdr)];
4588  memcpy (dhops,
4589  dvh->path,
4590  dvh->distance * sizeof (struct GNUNET_PeerIdentity));
4591  dhops[dvh->distance] = dv->target;
4592  if (GNUNET_EXTRA_LOGGING > 0)
4593  {
4594  char *path;
4595 
4597  for (unsigned int j = 0; j <= num_hops; j++)
4598  {
4599  char *tmp;
4600 
4601  GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[j]));
4602  GNUNET_free (path);
4603  path = tmp;
4604  }
4606  "Routing message of type %u to %s using DV (#%u/%u) via %s\n",
4607  ntohs (hdr->type),
4608  GNUNET_i2s (&dv->target),
4609  i + 1,
4610  num_dvhs + 1,
4611  path);
4612  GNUNET_free (path);
4613  }
4614  rtt = GNUNET_TIME_relative_min (rtt, dvh->pd.aged_rtt);
4615  memcpy (&dhops[num_hops], enc, sizeof (enc));
4616  use (use_cls,
4617  dvh->next_hop,
4618  (const struct GNUNET_MessageHeader *) buf,
4619  options);
4620  }
4621  return rtt;
4622 }
4623 
4624 
4634 static void
4636  struct Neighbour *next_hop,
4637  const struct GNUNET_MessageHeader *hdr,
4639 {
4640  (void) cls;
4641  (void) route_via_neighbour (next_hop, hdr, options);
4642 }
4643 
4644 
4656 static struct GNUNET_TIME_Relative
4658  const struct GNUNET_MessageHeader *hdr,
4660 {
4661  struct VirtualLink *vl;
4662  struct Neighbour *n;
4663  struct DistanceVector *dv;
4664  struct GNUNET_TIME_Relative rtt1;
4665  struct GNUNET_TIME_Relative rtt2;
4666 
4667  vl = lookup_virtual_link (target);
4668  GNUNET_assert (NULL != vl);
4669  n = vl->n;
4670  dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
4671  if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
4672  {
4673  /* if confirmed is required, and we do not have anything
4674  confirmed, drop respective options */
4675  if (NULL == n)
4676  n = lookup_neighbour (target);
4677  if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
4678  dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, target);
4679  }
4680  if ((NULL == n) && (NULL == dv))
4681  {
4683  "Cannot route message of type %u to %s: no route\n",
4684  ntohs (hdr->type),
4685  GNUNET_i2s (target));
4686  GNUNET_STATISTICS_update (GST_stats,
4687  "# Messages dropped in routing: no acceptable method",
4688  1,
4689  GNUNET_NO);
4691  }
4693  "Routing message of type %u to %s with options %X\n",
4694  ntohs (hdr->type),
4695  GNUNET_i2s (target),
4696  (unsigned int) options);
4697  /* If both dv and n are possible and we must choose:
4698  flip a coin for the choice between the two; for now 50/50 */
4699  if ((NULL != n) && (NULL != dv) && (0 == (options & RMO_REDUNDANT)))
4700  {
4702  n = NULL;
4703  else
4704  dv = NULL;
4705  }
4706  if ((NULL != n) && (NULL != dv))
4707  options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's
4708  enough for redunancy, so clear the flag. */
4711  if (NULL != n)
4712  {
4713  rtt1 = route_via_neighbour (n, hdr, options);
4714  }
4715  if (NULL != dv)
4716  {
4717  struct DistanceVectorHop *hops[2];
4718  unsigned int res;
4719 
4720  res = pick_random_dv_hops (dv,
4721  options,
4722  hops,
4723  (0 == (options & RMO_REDUNDANT)) ? 1 : 2);
4724  if (0 == res)
4725  {
4727  "Failed to route message, could not determine DV path\n");
4728  return rtt1;
4729  }
4730  rtt2 = encapsulate_for_dv (dv,
4731  res,
4732  hops,
4733  hdr,
4735  NULL,
4736  options & (~RMO_REDUNDANT));
4737  }
4738  return GNUNET_TIME_relative_min (rtt1, rtt2);
4739 }
4740 
4741 
4748 static void
4750 {
4751  struct VirtualLink *vl = cls;
4752  struct GNUNET_TIME_Absolute monotime;
4753  struct TransportFlowControlMessage fc;
4754  struct GNUNET_TIME_Relative duration;
4755  struct GNUNET_TIME_Relative rtt;
4756 
4758  /* OPTIMIZE-FC-BDP: decide sane criteria on when to do this, instead of doing
4759  it always! */
4760  /* For example, we should probably ONLY do this if a bit more than
4761  an RTT has passed, or if the window changed "significantly" since
4762  then. See vl->last_fc_rtt! NOTE: to do this properly, we also
4763  need an estimate for the bandwidth-delay-product for the entire
4764  VL, as that determines "significantly". We have the delay, but
4765  the bandwidth statistics need to be added for the VL!*/
4766  (void) duration;
4767 
4769  "Sending FC seq %u to %s with new window %llu\n",
4770  (unsigned int) vl->fc_seq_gen,
4771  GNUNET_i2s (&vl->target),
4772  (unsigned long long) vl->incoming_fc_window_size);
4773  monotime = GNUNET_TIME_absolute_get_monotonic (GST_cfg);
4774  vl->last_fc_transmission = monotime;
4776  fc.header.size = htons (sizeof (fc));
4777  fc.seq = htonl (vl->fc_seq_gen++);
4781  fc.sender_time = GNUNET_TIME_absolute_hton (monotime);
4783  if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us)
4784  {
4787  "FC retransmission to %s failed, will retry in %s\n",
4788  GNUNET_i2s (&vl->target),
4791  }
4792  else
4793  {
4794  /* OPTIMIZE-FC-BDP: rtt is not ideal, we can do better! */
4795  vl->last_fc_rtt = rtt;
4796  }
4797  if (NULL != vl->fc_retransmit_task)
4799  vl->fc_retransmit_task =
4801 }
4802 
4803 
4820 static void
4822 {
4823  struct Neighbour *n = vl->n;
4824  struct DistanceVector *dv = vl->dv;
4825  struct GNUNET_TIME_Absolute now;
4826  int elig;
4827 
4828  /* Check that we have an eligible pending message!
4829  (cheaper than having #transmit_on_queue() find out!) */
4830  elig = GNUNET_NO;
4831  for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
4832  pm = pm->next_vl)
4833  {
4834  if (NULL != pm->qe)
4835  continue; /* not eligible, is in a queue! */
4836  if (pm->bytes_msg + vl->outbound_fc_window_size_used >
4838  {
4840  "Stalled transmision on VL %s due to flow control: %llu < %llu\n",
4841  GNUNET_i2s (&vl->target),
4842  (unsigned long long) vl->outbound_fc_window_size,
4843  (unsigned long long) (pm->bytes_msg +
4845  consider_sending_fc (vl);
4846  return; /* We have a message, but flow control says "nope" */
4847  }
4848  elig = GNUNET_YES;
4849  break;
4850  }
4851  if (GNUNET_NO == elig)
4852  return;
4853 
4854  /* Notify queues at direct neighbours that we are interested */
4855  now = GNUNET_TIME_absolute_get ();
4856  if (NULL != n)
4857  {
4858  for (struct Queue *queue = n->queue_head; NULL != queue;
4859  queue = queue->next_neighbour)
4860  if ((GNUNET_YES == queue->idle) &&
4861  (queue->validated_until.abs_value_us > now.abs_value_us))
4863  }
4864  /* Notify queues via DV that we are interested */
4865  if (NULL != dv)
4866  {
4867  /* Do DV with lower scheduler priority, which effectively means that
4868  IF a neighbour exists and is available, we prefer it. */
4869  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4870  pos = pos->next_dv)
4871  {
4872  struct Neighbour *nh = pos->next_hop;
4873 
4874  if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
4875  continue; /* skip this one: path not validated */
4876  for (struct Queue *queue = nh->queue_head; NULL != queue;
4877  queue = queue->next_neighbour)
4878  if ((GNUNET_YES == queue->idle) &&
4879  (queue->validated_until.abs_value_us > now.abs_value_us))
4882  }
4883  }
4884 }
4885 
4886 
4893 static void
4894 handle_client_send (void *cls, const struct OutboundMessage *obm)
4895 {
4896  struct TransportClient *tc = cls;
4897  struct PendingMessage *pm;
4898  const struct GNUNET_MessageHeader *obmm;
4899  uint32_t bytes_msg;
4900  struct VirtualLink *vl;
4902 
4903  GNUNET_assert (CT_CORE == tc->type);
4904  obmm = (const struct GNUNET_MessageHeader *) &obm[1];
4905  bytes_msg = ntohs (obmm->size);
4906  pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
4907  vl = lookup_virtual_link (&obm->peer);
4908  if (NULL == vl)
4909  {
4910  /* Failure: don't have this peer as a neighbour (anymore).
4911  Might have gone down asynchronously, so this is NOT
4912  a protocol violation by CORE. Still count the event,
4913  as this should be rare. */
4915  GNUNET_STATISTICS_update (GST_stats,
4916  "# messages dropped (neighbour unknown)",
4917  1,
4918  GNUNET_NO);
4919  return;
4920  }
4921 
4922  pm = GNUNET_malloc (sizeof (struct PendingMessage) + bytes_msg);
4924  pm->prefs = pp;
4925  pm->client = tc;
4926  pm->vl = vl;
4927  pm->bytes_msg = bytes_msg;
4928  memcpy (&pm[1], obmm, bytes_msg);
4930  "Sending %u bytes as <%llu> to %s\n",
4931  bytes_msg,
4932  pm->logging_uuid,
4933  GNUNET_i2s (&obm->peer));
4935  tc->details.core.pending_msg_head,
4936  tc->details.core.pending_msg_tail,
4937  pm);
4939  vl->pending_msg_head,
4940  vl->pending_msg_tail,
4941  pm);
4942  check_vl_transmission (vl);
4943 }
4944 
4945 
4955 static void
4957  void *cls,
4958  const struct GNUNET_TRANSPORT_CommunicatorBackchannel *cb)
4959 {
4960  struct TransportClient *tc = cls;
4961  const struct GNUNET_MessageHeader *inbox =
4962  (const struct GNUNET_MessageHeader *) &cb[1];
4963  uint16_t isize = ntohs (inbox->size);
4964  const char *is = ((const char *) &cb[1]) + isize;
4965  char
4966  mbuf[isize +
4967  sizeof (struct TransportBackchannelEncapsulationMessage)] GNUNET_ALIGN;
4970 
4971  /* 0-termination of 'is' was checked already in
4972  #check_communicator_backchannel() */
4974  "Preparing backchannel transmission to %s:%s of type %u\n",
4975  GNUNET_i2s (&cb->pid),
4976  is,
4977  ntohs (inbox->size));
4978  /* encapsulate and encrypt message */
4979  be->header.type =
4981  be->header.size = htons (sizeof (mbuf));
4982  memcpy (&be[1], inbox, isize);
4983  memcpy (&mbuf[sizeof (struct TransportBackchannelEncapsulationMessage) +
4984  isize],
4985  is,
4986  strlen (is) + 1);
4989 }
4990 
4991 
4999 static int
5001  const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
5002 {
5003  struct TransportClient *tc = cls;
5004 
5005  if (CT_COMMUNICATOR != tc->type)
5006  {
5007  GNUNET_break (0);
5008  return GNUNET_SYSERR;
5009  }
5011  return GNUNET_OK;
5012 }
5013 
5014 
5020 static void
5021 store_pi (void *cls);
5022 
5023 
5030 static void
5031 peerstore_store_own_cb (void *cls, int success)
5032 {
5033  struct AddressListEntry *ale = cls;
5034 
5035  ale->sc = NULL;
5036  if (GNUNET_YES != success)
5038  "Failed to store our own address `%s' in peerstore!\n",
5039  ale->address);
5040  else
5042  "Successfully stored our own address `%s' in peerstore!\n",
5043  ale->address);
5044  /* refresh period is 1/4 of expiration time, that should be plenty
5045  without being excessive. */
5046  ale->st =
5048  4ULL),
5049  &store_pi,
5050  ale);
5051 }
5052 
5053 
5059 static void
5060 store_pi (void *cls)
5061 {
5062  struct AddressListEntry *ale = cls;
5063  void *addr;
5064  size_t addr_len;
5065  struct GNUNET_TIME_Absolute expiration;
5066 
5067  ale->st = NULL;
5068  expiration = GNUNET_TIME_relative_to_absolute (ale->expiration);
5070  "Storing our address `%s' in peerstore until %s!\n",
5071  ale->address,
5074  ale->nt,
5076  GST_my_private_key,
5077  &addr,
5078  &addr_len);
5079  ale->sc = GNUNET_PEERSTORE_store (peerstore,
5080  "transport",
5081  &GST_my_identity,
5083  addr,
5084  addr_len,
5085  expiration,
5088  ale);
5089  GNUNET_free (addr);
5090  if (NULL == ale->sc)
5091  {
5093  "Failed to store our address `%s' with peerstore\n",
5094  ale->address);
5095  ale->st =
5097  }
5098 }
5099 
5100 
5107 static void
5109  const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
5110 {
5111  struct TransportClient *tc = cls;
5112  struct AddressListEntry *ale;
5113  size_t slen;
5114 
5115  /* 0-termination of &aam[1] was checked in #check_add_address */
5117  "Communicator added address `%s'!\n",
5118  (const char *) &aam[1]);
5119  slen = ntohs (aam->header.size) - sizeof (*aam);
5120  ale = GNUNET_malloc (sizeof (struct AddressListEntry) + slen);
5121  ale->tc = tc;
5122  ale->address = (const char *) &ale[1];
5123  ale->expiration = GNUNET_TIME_relative_ntoh (aam->expiration);
5124  ale->aid = aam->aid;
5125  ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
5126  memcpy (&ale[1], &aam[1], slen);
5128  tc->details.communicator.addr_tail,
5129  ale);
5130  ale->st = GNUNET_SCHEDULER_add_now (&store_pi, ale);
5132 }
5133 
5134 
5141 static void
5143  const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
5144 {
5145  struct TransportClient *tc = cls;
5146  struct AddressListEntry *alen;
5147 
5148  if (CT_COMMUNICATOR != tc->type)
5149  {
5150  GNUNET_break (0);
5152  return;
5153  }
5154  for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
5155  NULL != ale;
5156  ale = alen)
5157  {
5158  alen = ale->next;
5159  if (dam->aid != ale->aid)
5160  continue;
5161  GNUNET_assert (ale->tc == tc);
5163  "Communicator deleted address `%s'!\n",
5164  ale->address);
5167  }
5168  GNUNET_break (0);
5170 }
5171 
5172 
5180 static void
5182  const struct GNUNET_MessageHeader *msg);
5183 
5184 
5192 static void
5193 core_env_sent_cb (void *cls)
5194 {
5195  struct CoreSentContext *ctx = cls;
5196  struct VirtualLink *vl = ctx->vl;
5197 
5198  if (NULL == vl)
5199  {
5200  /* lost the link in the meantime, ignore */
5201  GNUNET_free (ctx);
5202  return;
5203  }
5206  vl->incoming_fc_window_size_ram -= ctx->size;
5207  vl->incoming_fc_window_size_used += ctx->isize;
5208  consider_sending_fc (vl);
5209  GNUNET_free (ctx);
5210 }
5211 
5212 
5221 static void
5222 handle_raw_message (void *cls, const struct GNUNET_MessageHeader *mh)
5223 {
5224  struct CommunicatorMessageContext *cmc = cls;
5225  struct VirtualLink *vl;
5226  uint16_t size = ntohs (mh->size);
5227  int have_core;
5228 
5229  if ((size > UINT16_MAX - sizeof (struct InboundMessage)) ||
5230  (size < sizeof (struct GNUNET_MessageHeader)))
5231  {
5232  struct GNUNET_SERVICE_Client *client = cmc->tc->client;
5233 
5234  GNUNET_break (0);
5235  finish_cmc_handling (cmc);
5236  GNUNET_SERVICE_client_drop (client);
5237  return;
5238  }
5239  vl = lookup_virtual_link (&cmc->im.sender);
5240  if (NULL == vl)
5241  {
5242  /* FIXME: sender is giving us messages for CORE but we don't have
5243  the link up yet! I *suspect* this can happen right now (i.e.
5244  sender has verified us, but we didn't verify sender), but if
5245  we pass this on, CORE would be confused (link down, messages
5246  arrive). We should investigate more if this happens often,
5247  or in a persistent manner, and possibly do "something" about
5248  it. Thus logging as error for now. */
5249  GNUNET_break_op (0);
5250  GNUNET_STATISTICS_update (GST_stats,
5251  "# CORE messages droped (virtual link still down)",
5252  1,
5253  GNUNET_NO);
5254 
5255  finish_cmc_handling (cmc);
5256  return;
5257  }
5258  if (vl->incoming_fc_window_size_ram > UINT_MAX - size)
5259  {
5260  GNUNET_STATISTICS_update (GST_stats,
5261  "# CORE messages droped (FC arithmetic overflow)",
5262  1,
5263  GNUNET_NO);
5264 
5265  finish_cmc_handling (cmc);
5266  return;
5267  }
5269  {
5270  GNUNET_STATISTICS_update (GST_stats,
5271  "# CORE messages droped (FC window overflow)",
5272  1,
5273  GNUNET_NO);
5274  finish_cmc_handling (cmc);
5275  return;
5276  }
5277 
5278  /* Forward to all CORE clients */
5279  have_core = GNUNET_NO;
5280  for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
5281  {
5282  struct GNUNET_MQ_Envelope *env;
5283  struct InboundMessage *im;
5284  struct CoreSentContext *ctx;
5285 
5286  if (CT_CORE != tc->type)
5287  continue;
5290  ctx = GNUNET_new (struct CoreSentContext);
5291  ctx->vl = vl;
5292  ctx->size = size;
5293  ctx->isize = (GNUNET_NO == have_core) ? size : 0;
5294  have_core = GNUNET_YES;
5297  im->peer = cmc->im.sender;
5298  memcpy (&im[1], mh, size);
5299  GNUNET_MQ_send (tc->mq, env);
5300  vl->core_recv_window--;
5301  }
5302  if (GNUNET_NO == have_core)
5303  {
5305  "Dropped message to CORE: no CORE client connected!\n");
5306  /* Nevertheless, count window as used, as it is from the
5307  perspective of the other peer! */
5309  /* TODO-M1 */
5310  finish_cmc_handling (cmc);
5311  return;
5312  }
5314  "Delivered message from %s of type %u to CORE\n",
5315  GNUNET_i2s (&cmc->im.sender),
5316  ntohs (mh->type));
5317  if (vl->core_recv_window > 0)
5318  {
5319  finish_cmc_handling (cmc);
5320  return;
5321  }
5322  /* Wait with calling #finish_cmc_handling(cmc) until the message
5323  was processed by CORE MQs (for CORE flow control)! */
5325 }
5326 
5327 
5335 static int
5337 {
5338  uint16_t size = ntohs (fb->header.size);
5339  uint16_t bsize = size - sizeof (*fb);
5340 
5341  (void) cls;
5342  if (0 == bsize)
5343  {
5344  GNUNET_break_op (0);
5345  return GNUNET_SYSERR;
5346  }
5347  if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
5348  {
5349  GNUNET_break_op (0);
5350  return GNUNET_SYSERR;
5351  }
5352  if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
5353  {
5354  GNUNET_break_op (0);
5355  return GNUNET_SYSERR;
5356  }
5357  return GNUNET_YES;
5358 }
5359 
5360 
5366 static void
5368 {
5369  struct AcknowledgementCummulator *ac = cls;
5370 
5371  ac->task = NULL;
5372  GNUNET_assert (0 == ac->num_acks);
5373  GNUNET_assert (
5374  GNUNET_YES ==
5375  GNUNET_CONTAINER_multipeermap_remove (ack_cummulators, &ac->target, ac));
5376  GNUNET_free (ac);
5377 }
5378 
5379 
5385 static void
5387 {
5388  struct AcknowledgementCummulator *ac = cls;
5389  char buf[sizeof (struct TransportReliabilityAckMessage) +
5390  ac->ack_counter *
5391  sizeof (struct TransportCummulativeAckPayloadP)] GNUNET_ALIGN;
5392  struct TransportReliabilityAckMessage *ack =
5395 
5396  ac->task = NULL;
5398  "Sending ACK with %u components to %s\n",
5399  ac->ack_counter,
5400  GNUNET_i2s (&ac->target));
5401  GNUNET_assert (0 < ac->ack_counter);
5403  ack->header.size =
5404  htons (sizeof (*ack) +
5405  ac->ack_counter * sizeof (struct TransportCummulativeAckPayloadP));
5406  ack->ack_counter = htonl (ac->ack_counter++);
5407  ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
5408  for (unsigned int i = 0; i < ac->ack_counter; i++)
5409  {
5410  ap[i].ack_uuid = ac->ack_uuids[i].ack_uuid;
5412  GNUNET_TIME_absolute_get_duration (ac->ack_uuids[i].receive_time));
5413  }
5415  ac->num_acks = 0;
5418  ac);
5419 }
5420 
5421 
5430 static void
5432  const struct AcknowledgementUUIDP *ack_uuid,
5433  struct GNUNET_TIME_Absolute max_delay)
5434 {
5435  struct AcknowledgementCummulator *ac;
5436 
5438  "Scheduling ACK %s for transmission to %s\n",
5439  GNUNET_uuid2s (&ack_uuid->value),
5440  GNUNET_i2s (pid));
5441  ac = GNUNET_CONTAINER_multipeermap_get (ack_cummulators, pid);
5442  if (NULL == ac)
5443  {
5444  ac = GNUNET_new (struct AcknowledgementCummulator);
5445  ac->target = *pid;
5446  ac->min_transmission_time = max_delay;
5449  ack_cummulators,
5450  &ac->target,
5451  ac,
5453  }
5454  else
5455  {
5456  if (MAX_CUMMULATIVE_ACKS == ac->num_acks)
5457  {
5458  /* must run immediately, ack buffer full! */
5461  }
5463  ac->min_transmission_time =
5465  }
5468  ac->ack_uuids[ac->num_acks].ack_uuid = *ack_uuid;
5469  ac->num_acks++;
5472  ac);
5473 }
5474 
5475 
5480 {
5484  struct MessageUUIDP message_uuid;
5485 
5490 };
5491 
5492 
5502 static int
5503 find_by_message_uuid (void *cls, uint32_t key, void *value)
5504 {
5505  struct FindByMessageUuidContext *fc = cls;
5506  struct ReassemblyContext *rc = value;
5507 
5508  (void) key;
5509  if (0 == GNUNET_memcmp (&fc->message_uuid, &rc->msg_uuid))
5510  {
5511  fc->rc = rc;
5512  return GNUNET_NO;
5513  }
5514  return GNUNET_YES;
5515 }
5516 
5517 
5525 static void
5527 {
5528  struct CommunicatorMessageContext *cmc = cls;
5529  struct Neighbour *n;
5530  struct ReassemblyContext *rc;
5531  const struct GNUNET_MessageHeader *msg;
5532  uint16_t msize;
5533  uint16_t fsize;
5534  uint16_t frag_off;
5535  char *target;
5536  struct GNUNET_TIME_Relative cdelay;
5537  struct FindByMessageUuidContext fc;
5538 
5539  n = lookup_neighbour (&cmc->im.sender);
5540  if (NULL == n)
5541  {
5542  struct GNUNET_SERVICE_Client *client = cmc->tc->client;
5543 
5544  GNUNET_break (0);
5545  finish_cmc_handling (cmc);
5546  GNUNET_SERVICE_client_drop (client);
5547  return;
5548  }
5549  if (NULL == n->reassembly_map)
5550  {
5551  n->reassembly_map = GNUNET_CONTAINER_multihashmap32_create (8);
5552  n->reassembly_heap =
5554  n->reassembly_timeout_task =
5557  n);
5558  }
5559  msize = ntohs (fb->msg_size);
5560  fc.message_uuid = fb->msg_uuid;
5561  fc.rc = NULL;
5562  (void) GNUNET_CONTAINER_multihashmap32_get_multiple (n->reassembly_map,
5563  fb->msg_uuid.uuid,
5565  &fc);
5566  if (NULL == (rc = fc.rc))
5567  {
5568  rc = GNUNET_malloc (sizeof (*rc) + msize + /* reassembly payload buffer */
5569  (msize + 7) / 8 * sizeof (uint8_t) /* bitfield */);
5570  rc->msg_uuid = fb->msg_uuid;
5571  rc->neighbour = n;
5572  rc->msg_size = msize;
5573  rc->reassembly_timeout =
5576  rc->hn = GNUNET_CONTAINER_heap_insert (n->reassembly_heap,
5577  rc,
5581  n->reassembly_map,
5582  rc->msg_uuid.uuid,
5583  rc,
5585  target = (char *) &rc[1];
5586  rc->bitfield = (uint8_t *) (target + rc->msg_size);
5587  rc->msg_missing = rc->msg_size;
5589  "Received fragment at offset %u/%u from %s for NEW message %u\n",
5590  ntohs (fb->frag_off),
5591  msize,
5592  GNUNET_i2s (&cmc->im.sender),
5593  (unsigned int) fb->msg_uuid.uuid);
5594  }
5595  else
5596  {
5597  target = (char *) &rc[1];
5599  "Received fragment at offset %u/%u from %s for message %u\n",
5600  ntohs (fb->frag_off),
5601  msize,
5602  GNUNET_i2s (&cmc->im.sender),
5603  (unsigned int) fb->msg_uuid.uuid);
5604  }
5605  if (msize != rc->msg_size)
5606  {
5607  GNUNET_break (0);
5608  finish_cmc_handling (cmc);
5609  return;
5610  }
5611 
5612  /* reassemble */
5613  fsize = ntohs (fb->header.size) - sizeof (*fb);
5614  if (0 == fsize)
5615  {
5616  GNUNET_break (0);
5617  finish_cmc_handling (cmc);
5618  return;
5619  }
5620  frag_off = ntohs (fb->frag_off);
5621  if (frag_off + fsize > msize)
5622  {
5623  /* Fragment (plus fragment size) exceeds message size! */
5624  GNUNET_break_op (0);
5625  finish_cmc_handling (cmc);
5626  return;
5627  }
5628  memcpy (&target[frag_off], &fb[1], fsize);
5629  /* update bitfield and msg_missing */
5630  for (unsigned int i = frag_off; i < frag_off + fsize; i++)
5631  {
5632  if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
5633  {
5634  rc->bitfield[i / 8] |= (1 << (i % 8));
5635  rc->msg_missing--;
5636  }
5637  }
5638 
5639  /* Compute cummulative ACK */
5641  cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->msg_missing / fsize);
5642  if (0 == rc->msg_missing)
5643  cdelay = GNUNET_TIME_UNIT_ZERO;
5644  cummulative_ack (&cmc->im.sender,
5645  &fb->ack_uuid,
5648  /* is reassembly complete? */
5649  if (0 != rc->msg_missing)
5650  {
5651  finish_cmc_handling (cmc);
5652  return;
5653  }
5654  /* reassembly is complete, verify result */
5655  msg = (const struct GNUNET_MessageHeader *) &rc[1];
5656  if (ntohs (msg->size) != rc->msg_size)
5657  {
5658  GNUNET_break (0);
5660  finish_cmc_handling (cmc);
5661  return;
5662  }
5663  /* successful reassembly */
5665  "Fragment reassembly complete for message %u\n",
5666  (unsigned int) fb->msg_uuid.uuid);
5667  /* FIXME: check that the resulting msg is NOT a
5668  DV Box or Reliability Box, as that is NOT allowed! */
5669  demultiplex_with_cmc (cmc, msg);
5670  /* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still
5671  en-route and we forget that we finished this reassembly immediately!
5672  -> keep around until timeout?
5673  -> shorten timeout based on ACK? */
5675 }
5676 
5677 
5685 static int
5687  const struct TransportReliabilityBoxMessage *rb)
5688 {
5689  (void) cls;
5691  return GNUNET_YES;
5692 }
5693 
5694 
5702 static void
5704  const struct TransportReliabilityBoxMessage *rb)
5705 {
5706  struct CommunicatorMessageContext *cmc = cls;
5707  const struct GNUNET_MessageHeader *inbox =
5708  (const struct GNUNET_MessageHeader *) &rb[1];
5709  struct GNUNET_TIME_Relative rtt;
5710 
5712  "Received reliability box from %s with UUID %s of type %u\n",
5713  GNUNET_i2s (&cmc->im.sender),
5714  GNUNET_uuid2s (&rb->ack_uuid.value),
5715  (unsigned int) ntohs (inbox->type));
5716  rtt = GNUNET_TIME_UNIT_SECONDS; /* FIXME: should base this on "RTT", but we
5717  do not really have an RTT for the
5718  *incoming* queue (should we have
5719  the sender add it to the rb message?) */
5720  cummulative_ack (
5721  &cmc->im.sender,
5722  &rb->ack_uuid,
5723  (0 == ntohl (rb->ack_countdown))
5726  GNUNET_TIME_relative_divide (rtt, 8 /* FIXME: magic constant */)));
5727  /* continue with inner message */
5728  /* FIXME: check that inbox is NOT a DV Box, fragment or another
5729  reliability box (not allowed!) */
5730  demultiplex_with_cmc (cmc, inbox);
5731 }
5732 
5733 
5742 static void
5743 update_pd_age (struct PerformanceData *pd, unsigned int age)
5744 {
5745  unsigned int sage;
5746 
5747  if (age == pd->last_age)
5748  return; /* nothing to do */
5749  sage = GNUNET_MAX (pd->last_age, age - 2 * GOODPUT_AGING_SLOTS);
5750  for (unsigned int i = sage; i <= age - GOODPUT_AGING_SLOTS; i++)
5751  {
5752  struct TransmissionHistoryEntry *the = &pd->the[i % GOODPUT_AGING_SLOTS];
5753 
5754  the->bytes_sent = 0;
5755  the->bytes_received = 0;
5756  }
5757  pd->last_age = age;
5758 }
5759 
5760 
5769 static void
5771  struct GNUNET_TIME_Relative rtt,
5772  uint16_t bytes_transmitted_ok)
5773 {
5774  uint64_t nval = rtt.rel_value_us;
5775  uint64_t oval = pd->aged_rtt.rel_value_us;
5776  unsigned int age = get_age ();
5777  struct TransmissionHistoryEntry *the = &pd->the[age % GOODPUT_AGING_SLOTS];
5778 
5779  if (oval == GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us)
5780  pd->aged_rtt = rtt;
5781  else
5782  pd->aged_rtt.rel_value_us = (nval + 7 * oval) / 8;
5783  update_pd_age (pd, age);
5784  the->bytes_received += bytes_transmitted_ok;
5785 }
5786 
5787 
5795 static void
5797  struct GNUNET_TIME_Relative rtt,
5798  uint16_t bytes_transmitted_ok)
5799 {
5800  update_performance_data (&q->pd, rtt, bytes_transmitted_ok);
5801 }
5802 
5803 
5811 static void
5813  struct GNUNET_TIME_Relative rtt,
5814  uint16_t bytes_transmitted_ok)
5815 {
5816  update_performance_data (&dvh->pd, rtt, bytes_transmitted_ok);
5817 }
5818 
5819 
5827 static void
5829 {
5830  struct PendingMessage *pos;
5831 
5832  switch (pm->pmt)
5833  {
5834  case PMT_CORE:
5835  case PMT_RELIABILITY_BOX:
5836  /* Full message sent, we are done */
5837  client_send_response (pm);
5838  return;
5839  case PMT_FRAGMENT_BOX:
5840  /* Fragment sent over reliabile channel */
5841  free_fragment_tree (pm);
5842  pos = pm->frag_parent;
5843  GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
5844  GNUNET_free (pm);
5845  /* check if subtree is done */
5846  while ((NULL == pos->head_frag) && (pos->frag_off == pos->bytes_msg) &&
5847  (pos != pm))
5848  {
5849  pm = pos;
5850  pos = pm->frag_parent;
5851  GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
5852  GNUNET_free (pm);
5853  }
5854 
5855  /* Was this the last applicable fragmment? */
5856  if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) &&
5857  (pos->frag_off == pos->bytes_msg))
5858  client_send_response (pos);
5859  return;
5860  case PMT_DV_BOX:
5862  "Completed transmission of message %llu (DV Box)\n",
5863  pm->logging_uuid);
5864  free_pending_message (pm);
5865  return;
5866  }
5867 }
5868 
5869 
5877 static void
5879  struct GNUNET_TIME_Relative ack_delay)
5880 {
5881  struct GNUNET_TIME_Relative delay;
5882 
5884  if (delay.rel_value_us > ack_delay.rel_value_us)
5885  delay = GNUNET_TIME_UNIT_ZERO;
5886  else
5887  delay = GNUNET_TIME_relative_subtract (delay, ack_delay);
5888  if (NULL != pa->queue)
5889  update_queue_performance (pa->queue, delay, pa->message_size);
5890  if (NULL != pa->dvh)
5891  update_dvh_performance (pa->dvh, delay, pa->message_size);
5892  if (NULL != pa->pm)
5895 }
5896 
5897 
5905 static int
5907  const struct TransportReliabilityAckMessage *ra)
5908 {
5909  unsigned int n_acks;
5910 
5911  (void) cls;
5912  n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
5913  sizeof (struct TransportCummulativeAckPayloadP);
5914  if (0 == n_acks)
5915  {
5916  GNUNET_break_op (0);
5917  return GNUNET_SYSERR;
5918  }
5919  if ((ntohs (ra->header.size) - sizeof (*ra)) !=
5920  n_acks * sizeof (struct TransportCummulativeAckPayloadP))
5921  {
5922  GNUNET_break_op (0);
5923  return GNUNET_SYSERR;
5924  }
5925  return GNUNET_OK;
5926 }
5927 
5928 
5936 static void
5938  const struct TransportReliabilityAckMessage *ra)
5939 {
5940  struct CommunicatorMessageContext *cmc = cls;
5941  const struct TransportCummulativeAckPayloadP *ack;
5942  unsigned int n_acks;
5943  uint32_t ack_counter;
5944 
5945  n_acks = (ntohs (ra->header.size) - sizeof (*ra)) /
5946  sizeof (struct TransportCummulativeAckPayloadP);
5947  ack = (const struct TransportCummulativeAckPayloadP *) &ra[1];
5948  for (unsigned int i = 0; i < n_acks; i++)
5949  {
5950  struct PendingAcknowledgement *pa =
5951  GNUNET_CONTAINER_multiuuidmap_get (pending_acks, &ack[i].ack_uuid.value);
5952  if (NULL == pa)
5953  {
5955  "Received ACK from %s with UUID %s which is unknown to us!\n",
5956  GNUNET_i2s (&cmc->im.sender),
5957  GNUNET_uuid2s (&ack[i].ack_uuid.value));
5959  GST_stats,
5960  "# FRAGMENT_ACKS dropped, no matching pending message",
5961  1,
5962  GNUNET_NO);
5963  continue;
5964  }
5966  "Received ACK from %s with UUID %s\n",
5967  GNUNET_i2s (&cmc->im.sender),
5968  GNUNET_uuid2s (&ack[i].ack_uuid.value));
5969  handle_acknowledged (pa, GNUNET_TIME_relative_ntoh (ack[i].ack_delay));
5970  }
5971 
5972  ack_counter = htonl (ra->ack_counter);
5973  (void) ack_counter; /* silence compiler warning for now */
5974  // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere!
5975  // (DV and/or Neighbour?)
5976  finish_cmc_handling (cmc);
5977 }
5978 
5979 
5987 static int
5989  void *cls,
5991 {
5992  uint16_t size = ntohs (be->header.size) - sizeof (*be);
5993  const struct GNUNET_MessageHeader *inbox =
5994  (const struct GNUNET_MessageHeader *) &be[1];
5995  const char *is;
5996  uint16_t isize;
5997 
5998  (void) cls;
5999  if (ntohs (inbox->size) >= size)
6000  {
6001  GNUNET_break_op (0);
6002  return GNUNET_SYSERR;
6003  }
6004  isize = ntohs (inbox->size);
6005  is = ((const char *) inbox) + isize;
6006  size -= isize;
6007  if ('\0' != is[size - 1])
6008  {
6009  GNUNET_break_op (0);
6010  return GNUNET_SYSERR;
6011  }
6012  return GNUNET_YES;
6013 }
6014 
6015 
6024 static void
6026  void *cls,
6028 {
6029  struct CommunicatorMessageContext *cmc = cls;
6030  struct GNUNET_TRANSPORT_CommunicatorBackchannelIncoming *cbi;
6031  struct GNUNET_MQ_Envelope *env;
6032  struct TransportClient *tc;
6033  const struct GNUNET_MessageHeader *inbox =
6034  (const struct GNUNET_MessageHeader *) &be[1];
6035  uint16_t isize = ntohs (inbox->size);
6036  const char *target_communicator = ((const char *) inbox) + isize;
6037 
6038  /* Find client providing this communicator */
6039  for (tc = clients_head; NULL != tc; tc = tc->next)
6040  if ((CT_COMMUNICATOR == tc->type) &&
6041  (0 ==
6042  strcmp (tc->details.communicator.address_prefix, target_communicator)))
6043  break;
6044  if (NULL == tc)
6045  {
6046  char *stastr;
6047 
6048  GNUNET_asprintf (
6049  &stastr,
6050  "# Backchannel message dropped: target communicator `%s' unknown",
6051  target_communicator);
6052  GNUNET_STATISTICS_update (GST_stats, stastr, 1, GNUNET_NO);
6053  GNUNET_free (stastr);
6054  return;
6055  }
6056  /* Finally, deliver backchannel message to communicator */
6058  "Delivering backchannel message from %s of type %u to %s\n",
6059  GNUNET_i2s (&cmc->im.sender),
6060  ntohs (inbox->type),
6061  target_communicator);
6062  env = GNUNET_MQ_msg_extra (
6063  cbi,
6064  isize,
6066  cbi->pid = cmc->im.sender;
6067  memcpy (&cbi[1], inbox, isize);
6068  GNUNET_MQ_send (tc->mq, env);
6069 }
6070 
6071 
6081 static void
6082 path_cleanup_cb (void *cls)
6083 {
6084  struct DistanceVector *dv = cls;
6085  struct DistanceVectorHop *pos;
6086 
6087  dv->timeout_task = NULL;
6088  while (NULL != (pos = dv->dv_head))
6089  {
6090  GNUNET_assert (dv == pos->dv);
6092  break;
6094  }
6095  if (NULL == pos)
6096  {
6097  free_dv_route (dv);
6098  return;
6099  }
6100  dv->timeout_task =
6102 }
6103 
6104 
6112 static void
6114 {
6115  struct DistanceVector *dv = hop->dv;
6116  struct VirtualLink *vl;
6117 
6118  vl = lookup_virtual_link (&dv->target);
6119  if (NULL != vl)
6120  {
6121  /* Link was already up, remember dv is also now available and we are done */
6122  vl->dv = dv;
6124  "Virtual link to %s could now also use DV!\n",
6125  GNUNET_i2s (&dv->target));
6126  return;
6127  }
6129  "Creating new virtual link to %s using DV!\n",
6130  GNUNET_i2s (&dv->target));
6131  vl = GNUNET_new (struct VirtualLink);
6132  vl->message_uuid_ctr =
6134  vl->target = dv->target;
6135  vl->dv = dv;
6136  dv->vl = vl;
6139  vl->visibility_task =
6143  links,
6144  &vl->target,
6145  vl,
6147  consider_sending_fc (vl);
6148  /* We lacked a confirmed connection to the target
6149  before, so tell CORE about it (finally!) */
6151 }
6152 
6153 
6179 static int
6181  unsigned int path_len,
6182  struct GNUNET_TIME_Relative network_latency,
6183  struct GNUNET_TIME_Absolute path_valid_until)
6184 {
6185  struct DistanceVectorHop *hop;
6186  struct DistanceVector *dv;
6187  struct Neighbour *next_hop;
6188  unsigned int shorter_distance;
6189 
6190  if (path_len < 3)
6191  {
6192  /* what a boring path! not allowed! */
6193  GNUNET_break (0);
6194  return GNUNET_SYSERR;
6195  }
6196  GNUNET_assert (0 == GNUNET_memcmp (&GST_my_identity, &path[0]));
6197  next_hop = lookup_neighbour (&path[1]);
6198  if (NULL == next_hop)
6199  {
6200  /* next hop must be a neighbour, otherwise this whole thing is useless! */
6201  GNUNET_break (0);
6202  return GNUNET_SYSERR;
6203  }
6204  for (unsigned int i = 2; i < path_len; i++)
6205  if (NULL != lookup_neighbour (&path[i]))
6206  {
6207  /* Useless path: we have a direct connection to some hop
6208  in the middle of the path, so this one is not even
6209  terribly useful for redundancy */
6211  "Path of %u hops useless: directly link to hop %u (%s)\n",
6212  path_len,
6213  i,
6214  GNUNET_i2s (&path[i]));
6215  GNUNET_STATISTICS_update (GST_stats,
6216  "# Useless DV path ignored: hop is neighbour",
6217  1,
6218  GNUNET_NO);
6219  return GNUNET_SYSERR;
6220  }
6221  dv = GNUNET_CONTAINER_multipeermap_get (dv_routes, &path[path_len - 1]);
6222  if (NULL == dv)
6223  {
6224  dv = GNUNET_new (struct DistanceVector);
6225  dv->target = path[path_len - 1];
6227  &path_cleanup_cb,
6228  dv);
6231  dv_routes,
6232  &dv->target,
6233  dv,
6235  }
6236  /* Check if we have this path already! */
6237  shorter_distance = 0;
6238  for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
6239  pos = pos->next_dv)
6240  {
6241  if (pos->distance < path_len - 2)
6242  shorter_distance++;
6243  /* Note that the distances in 'pos' excludes us (path[0]) and
6244  the next_hop (path[1]), so we need to subtract two
6245  and check next_hop explicitly */
6246  if ((pos->distance == path_len - 2) && (pos->next_hop == next_hop))
6247  {
6248  int match = GNUNET_YES;
6249 
6250  for (unsigned int i = 0; i < pos->distance; i++)
6251  {
6252  if (0 != GNUNET_memcmp (&pos->path[i], &path[i + 2]))
6253  {
6254  match = GNUNET_NO;
6255  break;
6256  }
6257  }
6258  if (GNUNET_YES == match)
6259  {
6260  struct GNUNET_TIME_Relative last_timeout;
6261 
6262  /* Re-discovered known path, update timeout */
6263  GNUNET_STATISTICS_update (GST_stats,
6264  "# Known DV path refreshed",
6265  1,
6266  GNUNET_NO);
6267  last_timeout = GNUNET_TIME_absolute_get_remaining (pos->timeout);
6268  pos->timeout =
6270  pos->path_valid_until =
6271  GNUNET_TIME_absolute_max (pos->path_valid_until, path_valid_until);
6272  GNUNET_CONTAINER_MDLL_remove (dv, dv->dv_head, dv->dv_tail, pos);
6273  GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, pos);
6274  if (0 <
6277  if (last_timeout.rel_value_us <
6280  .rel_value_us)
6281  {
6282  /* Some peer send DV learn messages too often, we are learning
6283  the same path faster than it would be useful; do not forward! */
6285  "Rediscovered path too quickly, not forwarding further\n");
6286  return GNUNET_NO;
6287  }
6289  "Refreshed known path to %s, forwarding further\n",
6290  GNUNET_i2s (&dv->target));
6291  return GNUNET_YES;
6292  }
6293  }
6294  }
6295  /* Count how many shorter paths we have (incl. direct
6296  neighbours) before simply giving up on this one! */
6297  if (shorter_distance >= MAX_DV_PATHS_TO_TARGET)
6298  {
6299  /* We have a shorter path already! */
6301  "Have many shorter DV paths %s, not forwarding further\n",
6302  GNUNET_i2s (&dv->target));
6303  return GNUNET_NO;
6304  }
6305  /* create new DV path entry */
6307  "Discovered new DV path to %s\n",
6308  GNUNET_i2s (&dv->target));
6309  hop = GNUNET_malloc (sizeof (struct DistanceVectorHop) +
6310  sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
6311  hop->next_hop = next_hop;
6312  hop->dv = dv;
6313  hop->path = (const struct GNUNET_PeerIdentity *) &hop[1];
6314  memcpy (&hop[1],
6315  &path[2],
6316  sizeof (struct GNUNET_PeerIdentity) * (path_len - 2));
6318  hop->path_valid_until = path_valid_until;
6319  hop->distance = path_len - 2;
6320  hop->pd.aged_rtt = network_latency;
6321  GNUNET_CONTAINER_MDLL_insert (dv, dv->dv_head, dv->dv_tail, hop);
6322  GNUNET_CONTAINER_MDLL_insert (neighbour,
6323  next_hop->dv_head,
6324  next_hop->dv_tail,
6325  hop);
6326  if (0 < GNUNET_TIME_absolute_get_remaining (path_valid_until).rel_value_us)
6328  return GNUNET_YES;
6329 }
6330 
6331 
6339 static int
6340 check_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
6341 {
6342  uint16_t size = ntohs (dvl->header.size);
6343  uint16_t num_hops = ntohs (dvl->num_hops);
6344  const struct DVPathEntryP *hops = (const struct DVPathEntryP *) &dvl[1];
6345 
6346  (void) cls;
6347  if (size != sizeof (*dvl) + num_hops * sizeof (struct DVPathEntryP))
6348  {
6349  GNUNET_break_op (0);
6350  return GNUNET_SYSERR;
6351  }
6352  if (num_hops > MAX_DV_HOPS_ALLOWED)
6353  {
6354  GNUNET_break_op (0);
6355  return GNUNET_SYSERR;
6356  }
6357  for (unsigned int i = 0; i < num_hops; i++)
6358  {
6359  if (0 == GNUNET_memcmp (&dvl->initiator, &hops[i].hop))
6360  {
6361  GNUNET_break_op (0);
6362  return GNUNET_SYSERR;
6363  }
6364  if (0 == GNUNET_memcmp (&GST_my_identity, &hops[i].hop))
6365  {
6366  GNUNET_break_op (0);
6367  return GNUNET_SYSERR;
6368  }
6369  }
6370  return GNUNET_YES;
6371 }
6372 
6373 
6385 static void
6386 forward_dv_learn (const struct GNUNET_PeerIdentity *next_hop,
6387  const struct TransportDVLearnMessage *msg,
6388  uint16_t bi_history,
6389  uint16_t nhops,
6390  const struct DVPathEntryP *hops,
6391  struct GNUNET_TIME_Absolute in_time)
6392 {
6393  struct DVPathEntryP *dhops;
6394  char buf[sizeof (struct TransportDVLearnMessage) +
6395  (nhops + 1) * sizeof (struct DVPathEntryP)] GNUNET_ALIGN;
6396  struct TransportDVLearnMessage *fwd = (struct TransportDVLearnMessage *) buf;
6397  struct GNUNET_TIME_Relative nnd;
6398 
6399  /* compute message for forwarding */
6401  "Forwarding DV learn message originating from %s to %s\n",
6402  GNUNET_i2s (&msg->initiator),
6403  GNUNET_i2s2 (next_hop));
6406  fwd->header.size = htons (sizeof (struct TransportDVLearnMessage) +
6407  (nhops + 1) * sizeof (struct DVPathEntryP));
6408  fwd->num_hops = htons (nhops + 1);
6409  fwd->bidirectional = htons (bi_history);
6412  msg->non_network_delay));
6414  fwd->init_sig = msg->init_sig;
6415  fwd->initiator = msg->initiator;
6416  fwd->challenge = msg->challenge;
6417  dhops = (struct DVPathEntryP *) &fwd[1];
6418  GNUNET_memcpy (dhops, hops, sizeof (struct DVPathEntryP) * nhops);
6419  dhops[nhops].hop = GST_my_identity;
6420  {
6421  struct DvHopPS dhp = {.purpose.purpose =
6423  .purpose.size = htonl (sizeof (dhp)),
6424  .pred = dhops[nhops - 1].hop,
6425  .succ = *next_hop,
6426  .challenge = msg->challenge};
6427 
6429  GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
6430  &dhp.purpose,
6431  &dhops[nhops].hop_sig));
6432  }
6434  &fwd->header,
6436 }
6437 
6438 
6448 static int
6450  struct GNUNET_TIME_AbsoluteNBO sender_monotonic_time,
6451  const struct GNUNET_PeerIdentity *init,
6452  const struct ChallengeNonceP *challenge,
6453  const struct GNUNET_CRYPTO_EddsaSignature *init_sig)
6454 {
6455  struct DvInitPS ip = {.purpose.purpose = htonl (
6457  .purpose.size = htonl (sizeof (ip)),
6458  .monotonic_time = sender_monotonic_time,
6459  .challenge = *challenge};
6460 
6461  if (
6462  GNUNET_OK !=
6464  &ip.purpose,
6465  init_sig,
6466  &init->public_key))
6467  {
6468  GNUNET_break_op (0);
6469  return GNUNET_SYSERR;
6470  }
6471  return GNUNET_OK;
6472 }
6473 
6474 
6479 {
6484 
6488  const struct DVPathEntryP *hops;
6489 
6493  struct GNUNET_TIME_Absolute in_time;
6494 
6498  uint32_t selections[MAX_DV_DISCOVERY_SELECTION];
6499 
6503  unsigned int num_eligible;
6504 
6508  unsigned int num_selections;
6509 
6513  uint16_t nhops;
6514 
6518  uint16_t bi_history;
6519 };
6520 
6521 
6530 static int
6532  const struct GNUNET_PeerIdentity *pid,
6533  void *value)
6534 {
6535  struct NeighbourSelectionContext *nsc = cls;
6536 
6537  (void) value;
6538  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
6539  return GNUNET_YES; /* skip initiator */
6540  for (unsigned int i = 0; i < nsc->nhops; i++)
6541  if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
6542  return GNUNET_YES; /* skip peers on path */
6543  nsc->num_eligible++;
6544  return GNUNET_YES;
6545 }
6546 
6547 
6558 static int
6560  const struct GNUNET_PeerIdentity *pid,
6561  void *value)
6562 {
6563  struct NeighbourSelectionContext *nsc = cls;
6564 
6565  (void) value;
6566  if (0 == GNUNET_memcmp (pid, &nsc->dvl->initiator))
6567  return GNUNET_YES; /* skip initiator */
6568  for (unsigned int i = 0; i < nsc->nhops; i++)
6569  if (0 == GNUNET_memcmp (pid, &nsc->hops[i].hop))
6570  return GNUNET_YES; /* skip peers on path */
6571  for (unsigned int i = 0; i < nsc->num_selections; i++)
6572  {
6573  if (nsc->selections[i] == nsc->num_eligible)
6574  {
6575  forward_dv_learn (pid,
6576  nsc->dvl,
6577  nsc->bi_history,
6578  nsc->nhops,
6579  nsc->hops,
6580  nsc->in_time);
6581  break;
6582  }
6583  }
6584  nsc->num_eligible++;
6585  return GNUNET_YES;
6586 }
6587 
6588 
6632 static unsigned int
6633 calculate_fork_degree (unsigned int hops_taken,
6634  unsigned int neighbour_count,
6635  unsigned int eligible_count)
6636 {
6637  double target_total = 50.0; /* FIXME: use LOG(NSE)? */
6638  double eligible_ratio =
6639  ((double) eligible_count) / ((double) neighbour_count);
6640  double boost_factor = eligible_ratio * eligible_ratio;
6641  unsigned int rnd;
6642  double left;
6643 
6644  if (hops_taken >= 64)
6645  {
6646  GNUNET_break (0);
6647  return 0; /* precaution given bitshift below */
6648  }
6649  for (unsigned int i = 1; i < hops_taken; i++)
6650  {
6651  /* For each hop, subtract the expected number of targets
6652  reached at distance d (so what remains divided by 2^d) */
6653  target_total -= (target_total * boost_factor / (1LLU << i));
6654  }
6655  rnd =
6656  (unsigned int) floor (target_total * boost_factor / (1LLU << hops_taken));
6657  /* round up or down probabilistically depending on how close we were
6658  when floor()ing to rnd */
6659  left = target_total - (double) rnd;
6660  if (UINT32_MAX * left >
6662  rnd++; /* round up */
6664  "Forwarding DV learn message of %u hops %u(/%u/%u) times\n",
6665  hops_taken,
6666  rnd,
6667  eligible_count,
6668  neighbour_count);
6669  return rnd;
6670 }
6671 
6672 
6679 static void
6680 neighbour_store_dvmono_cb (void *cls, int success)
6681 {
6682  struct Neighbour *n = cls;
6683 
6684  n->sc = NULL;
6685  if (GNUNET_YES != success)
6687  "Failed to store other peer's monotonic time in peerstore!\n");
6688 }
6689 
6690 
6698 static void
6699 handle_dv_learn (void *cls, const struct TransportDVLearnMessage *dvl)
6700 {
6701  struct CommunicatorMessageContext *cmc = cls;
6703  int bi_hop;
6704  uint16_t nhops;
6705  uint16_t bi_history;
6706  const struct DVPathEntryP *hops;
6707  int do_fwd;
6708  int did_initiator;
6709  struct GNUNET_TIME_Absolute in_time;
6710  struct Neighbour *n;
6711 
6712  nhops = ntohs (dvl->bidirectional); /* 0 = sender is initiator */
6713  bi_history = ntohs (dvl->bidirectional);
6714  hops = (const struct DVPathEntryP *) &dvl[1];
6715  if (0 == nhops)
6716  {
6717  /* sanity check */
6718  if (0 != GNUNET_memcmp (&dvl->initiator, &cmc->im.sender))
6719  {
6720  GNUNET_break (0);
6721  finish_cmc_handling (cmc);
6722  return;
6723  }
6724  }
6725  else
6726  {
6727  /* sanity check */
6728  if (0 != GNUNET_memcmp (&hops[nhops - 1].hop, &cmc->im.sender))
6729  {
6730  GNUNET_break (0);
6731  finish_cmc_handling (cmc);
6732  return;
6733  }
6734  }
6735 
6736  GNUNET_assert (CT_COMMUNICATOR == cmc->tc->type);
6737  cc = cmc->tc->details.communicator.cc;
6738  bi_hop = (GNUNET_TRANSPORT_CC_RELIABLE ==
6739  cc); // FIXME: add bi-directional flag to cc?
6740  in_time = GNUNET_TIME_absolute_get ();
6741 
6742  /* continue communicator here, everything else can happen asynchronous! */
6743  finish_cmc_handling (cmc);
6744 
6745  n = lookup_neighbour (&dvl->initiator);
6746  if (NULL != n)
6747  {
6748  if ((n->dv_monotime_available == GNUNET_YES) &&
6751  {
6752  GNUNET_STATISTICS_update (GST_stats,
6753  "# DV learn discarded due to time travel",
6754  1,
6755  GNUNET_NO);
6756  return;
6757  }
6759  &dvl->initiator,
6760  &dvl->challenge,
6761  &dvl->init_sig))
6762  {
6763  GNUNET_break_op (0);
6764  return;
6765  }
6768  {
6769  if (NULL != n->sc)
6771  n->sc =
6772  GNUNET_PEERSTORE_store (peerstore,
6773  "transport",
6774  &dvl->initiator,
6776  &dvl->monotonic_time,
6777  sizeof (dvl->monotonic_time),
6781  n);
6782  }
6783  }
6784  /* OPTIMIZE-FIXME: asynchronously (!) verify signatures!,
6785  If signature verification load too high, implement random drop strategy */
6786  for (unsigned int i = 0; i < nhops; i++)
6787  {
6788  struct DvHopPS dhp = {.purpose.purpose =
6790  .purpose.size = htonl (sizeof (dhp)),
6791  .pred = (0 == i) ? dvl->initiator : hops[i - 1].hop,
6792  .succ = (nhops == i + 1) ? GST_my_identity
6793  : hops[i + 1].hop,
6794  .challenge = dvl->challenge};
6795 
6796  if (GNUNET_OK !=
6798  &dhp.purpose,
6799  &hops[i].hop_sig,
6800  &hops[i].hop.public_key))
6801  {
6802  GNUNET_break_op (0);
6803  return;
6804  }
6805  }
6806 
6807  if (GNUNET_EXTRA_LOGGING > 0)
6808  {
6809  char *path;
6810 
6811  path = GNUNET_strdup (GNUNET_i2s (&dvl->initiator));
6812  for (unsigned int i = 0; i < nhops; i++)
6813  {
6814  char *tmp;
6815 
6816  GNUNET_asprintf (&tmp,
6817  "%s%s%s",
6818  path,
6819  (bi_history & (1 << (nhops - i))) ? "<->" : "-->",
6820  GNUNET_i2s (&hops[i].hop));
6821  GNUNET_free (path);
6822  path = tmp;
6823  }
6825  "Received DVInit via %s%s%s\n",
6826  path,
6827  bi_hop ? "<->" : "-->",
6829  GNUNET_free (path);
6830  }
6831 
6832  do_fwd = GNUNET_YES;
6833  if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator))
6834  {
6835  struct GNUNET_PeerIdentity path[nhops + 1];
6836  struct GNUNET_TIME_Relative host_latency_sum;
6837  struct GNUNET_TIME_Relative latency;
6838  struct GNUNET_TIME_Relative network_latency;
6839 
6840  /* We initiated this, learn the forward path! */
6841  path[0] = GST_my_identity;
6842  path[1] = hops[0].hop;
6843  host_latency_sum = GNUNET_TIME_relative_ntoh (dvl->non_network_delay);
6844 
6845  // Need also something to lookup initiation time
6846  // to compute RTT! -> add RTT argument here?
6847  latency = GNUNET_TIME_UNIT_FOREVER_REL; // FIXME: initialize properly
6848  // (based on dvl->challenge, we can identify time of origin!)
6849 
6850  network_latency = GNUNET_TIME_relative_subtract (latency, host_latency_sum);
6851  /* assumption: latency on all links is the same */
6852  network_latency = GNUNET_TIME_relative_divide (network_latency, nhops);
6853 
6854  for (unsigned int i = 2; i <= nhops; i++)
6855  {
6856  struct GNUNET_TIME_Relative ilat;
6857 
6858  /* assumption: linear latency increase per hop */
6859  ilat = GNUNET_TIME_relative_multiply (network_latency, i);
6860  path[i] = hops[i - 1].hop;
6862  "Learned path with %u hops to %s with latency %s\n",
6863  i,
6864  GNUNET_i2s (&path[i]),
6866  learn_dv_path (path,
6867  i,
6868  ilat,
6871  }
6872  /* as we initiated, do not forward again (would be circular!) */
6873  do_fwd = GNUNET_NO;
6874  return;
6875  }
6876  if (bi_hop)
6877  {
6878  /* last hop was bi-directional, we could learn something here! */
6879  struct GNUNET_PeerIdentity path[nhops + 2];
6880 
6881  path[0] = GST_my_identity;
6882  path[1] = hops[nhops - 1].hop; /* direct neighbour == predecessor! */
6883  for (unsigned int i = 0; i < nhops; i++)
6884  {
6885  int iret;
6886 
6887  if (0 == (bi_history & (1 << i)))
6888  break; /* i-th hop not bi-directional, stop learning! */
6889  if (i == nhops - 1)
6890  {
6891  path[i + 2] = dvl->initiator;
6892  }
6893  else
6894  {
6895  path[i + 2] = hops[nhops - i - 2].hop;
6896  }
6897 
6899  "Learned inverse path with %u hops to %s\n",
6900  i + 1,
6901  GNUNET_i2s (&path[i + 2]));
6902  iret = learn_dv_path (path,
6903  i + 2,
6906  if (GNUNET_SYSERR == iret)
6907  {
6908  /* path invalid or too long to be interesting for US, thus should also
6909  not be interesting to our neighbours, cut path when forwarding to
6910  'i' hops, except of course for the one that goes back to the
6911  initiator */
6912  GNUNET_STATISTICS_update (GST_stats,
6913  "# DV learn not forwarded due invalidity of path",
6914  1,
6915  GNUNET_NO);
6916  do_fwd = GNUNET_NO;
6917  break;
6918  }
6919  if ((GNUNET_NO == iret) && (nhops == i + 1))
6920  {
6921  /* we have better paths, and this is the longest target,
6922  so there cannot be anything interesting later */
6923  GNUNET_STATISTICS_update (GST_stats,
6924  "# DV learn not forwarded, got better paths",
6925  1,
6926  GNUNET_NO);
6927  do_fwd = GNUNET_NO;
6928  break;
6929  }
6930  }
6931  }
6932 
6933  if (MAX_DV_HOPS_ALLOWED == nhops)
6934  {
6935  /* At limit, we're out of here! */
6936  finish_cmc_handling (cmc);
6937  return;
6938  }
6939 
6940  /* Forward to initiator, if path non-trivial and possible */
6941  bi_history = (bi_history << 1) | (bi_hop ? 1 : 0);
6942  did_initiator = GNUNET_NO;
6943  if ((1 < nhops) &&
6944  (GNUNET_YES ==
6946  {
6947  /* send back to origin! */
6949  "Sending DVL back to initiator %s\n",
6950  GNUNET_i2s (&dvl->initiator));
6951  forward_dv_learn (&dvl->initiator, dvl, bi_history, nhops, hops, in_time);
6952  did_initiator = GNUNET_YES;
6953  }
6954  /* We forward under two conditions: either we still learned something
6955  ourselves (do_fwd), or the path was darn short and thus the initiator is
6956  likely to still be very interested in this (and we did NOT already
6957  send it back to the initiator) */
6958  if ((do_fwd) || ((nhops < MIN_DV_PATH_LENGTH_FOR_INITIATOR) &&
6959  (GNUNET_NO == did_initiator)))
6960  {
6961  /* Pick random neighbours that are not yet on the path */
6962  struct NeighbourSelectionContext nsc;
6963  unsigned int n_cnt;
6964 
6965  n_cnt = GNUNET_CONTAINER_multipeermap_size (neighbours);
6966  nsc.nhops = nhops;
6967  nsc.dvl = dvl;
6968  nsc.bi_history = bi_history;
6969  nsc.hops = hops;
6970  nsc.in_time = in_time;
6971  nsc.num_eligible = 0;
6974  &nsc);
6975  if (0 == nsc.num_eligible)
6976  return; /* done here, cannot forward to anyone else */
6977  nsc.num_selections = calculate_fork_degree (nhops, n_cnt, nsc.num_eligible);
6978  nsc.num_selections =
6981  "Forwarding DVL to %u other peers\n",
6982  nsc.num_selections);
6983  for (unsigned int i = 0; i < nsc.num_selections; i++)
6984  nsc.selections[i] =
6985  (nsc.num_selections == n_cnt)
6986  ? i /* all were selected, avoid collisions by chance */
6988  nsc.num_eligible = 0;
6991  &nsc);
6992  }
6993 }
6994 
6995 
7003 static int
7004 check_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
7005 {
7006  uint16_t size = ntohs (dvb->header.size);
7007  uint16_t num_hops = ntohs (dvb->num_hops);
7008  const struct GNUNET_PeerIdentity *hops =
7009  (const struct GNUNET_PeerIdentity *) &dvb[1];
7010 
7011  (void) cls;
7012  if (size < sizeof (*dvb) + num_hops * sizeof (struct GNUNET_PeerIdentity) +
7013  sizeof (struct GNUNET_MessageHeader))
7014  {
7015  GNUNET_break_op (0);
7016  return GNUNET_SYSERR;
7017  }
7018  /* This peer must not be on the path */
7019  for (unsigned int i = 0; i < num_hops; i++)
7020  if (0 == GNUNET_memcmp (&hops[i], &GST_my_identity))
7021  {
7022  GNUNET_break_op (0);
7023  return GNUNET_SYSERR;
7024  }
7025  return GNUNET_YES;
7026 }
7027 
7028 
7041 static void
7042 forward_dv_box (struct Neighbour *next_hop,
7043  const struct TransportDVBoxMessage *hdr,
7044  uint16_t total_hops,
7045  uint16_t num_hops,
7046  const struct GNUNET_PeerIdentity *hops,
7047  const void *enc_payload,
7048  uint16_t enc_payload_size)
7049 {
7050  struct VirtualLink *vl = next_hop->vl;
7051  struct PendingMessage *pm;
7052  size_t msg_size;
7053  char *buf;
7054  struct GNUNET_PeerIdentity *dhops;
7055 
7056  GNUNET_assert (NULL != vl);
7057  msg_size = sizeof (struct TransportDVBoxMessage) +
7058  num_hops * sizeof (struct GNUNET_PeerIdentity) + enc_payload_size;
7059  pm = GNUNET_malloc (sizeof (struct PendingMessage) + msg_size);
7060  pm->pmt = PMT_DV_BOX;
7061  pm->vl = vl;
7063  pm->logging_uuid = logging_uuid_gen++;
7064  pm->prefs = GNUNET_MQ_PRIO_BACKGROUND;
7065  pm->bytes_msg = msg_size;
7066  buf = (char *) &pm[1];
7067  memcpy (buf, hdr, sizeof (*hdr));
7068  dhops =
7069  (struct GNUNET_PeerIdentity *) &buf[sizeof (struct TransportDVBoxMessage)];
7070  memcpy (dhops, hops, num_hops * sizeof (struct GNUNET_PeerIdentity));
7071  memcpy (&dhops[num_hops], enc_payload, enc_payload_size);
7073  vl->pending_msg_head,
7074  vl->pending_msg_tail,
7075  pm);
7077  "Created pending message %llu for DV Box with next hop %s (%u/%u)\n",
7078  pm->logging_uuid,
7079  GNUNET_i2s (&next_hop->pid),
7080  (unsigned int) num_hops,
7081  (unsigned int) total_hops);
7082  check_vl_transmission (vl);
7083 }
7084 
7085 
7091 static void
7093 {
7094  if (NULL != b->get)
7095  {
7097  b->get = NULL;
7098  GNUNET_assert (NULL != b->cmc);
7099  finish_cmc_handling (b->cmc);
7100  b->cmc = NULL;
7101  }
7102  if (NULL != b->task)
7103  {
7105  b->task = NULL;
7106  }
7107  if (NULL != b->sc)
7108  {
7110  b->sc = NULL;
7111  }
7112  GNUNET_assert (
7113  GNUNET_YES ==
7114  GNUNET_CONTAINER_multipeermap_remove (backtalkers, &b->pid, b));
7115  GNUNET_free (b);
7116 }
7117 
7118 
7127 static int
7129  const struct GNUNET_PeerIdentity *pid,
7130  void *value)
7131 {
7132  struct Backtalker *b = value;
7133 
7134  (void) cls;
7135  (void) pid;
7136  free_backtalker (b);
7137  return GNUNET_OK;
7138 }
7139 
7140 
7146 static void
7148 {
7149  struct Backtalker *b = cls;
7150 
7151  b->task = NULL;
7153  {
7155  return;
7156  }
7157  GNUNET_assert (NULL == b->sc);
7158  free_backtalker (b);
7159 }
7160 
7161 
7170 static void
7172  const struct GNUNET_PEERSTORE_Record *record,
7173  const char *emsg)
7174 {
7175  struct Backtalker *b = cls;
7176  struct GNUNET_TIME_AbsoluteNBO *mtbe;
7177  struct GNUNET_TIME_Absolute mt;
7178 
7179  (void) emsg;
7180  if (NULL == record)
7181  {
7182  /* we're done with #backtalker_monotime_cb() invocations,
7183  continue normal processing */
7184  b->get = NULL;
7185  GNUNET_assert (NULL != b->cmc);
7186  if (0 != b->body_size)
7188  (const struct GNUNET_MessageHeader *) &b[1]);
7189  else
7190  finish_cmc_handling (b->cmc);
7191  b->cmc = NULL;
7192  return;
7193  }
7194  if (sizeof (*mtbe) != record->value_size)
7195  {
7196  GNUNET_break (0);
7197  return;
7198  }
7199  mtbe = record->value;
7200  mt = GNUNET_TIME_absolute_ntoh (*mtbe);
7202  {
7204  "Backtalker message from %s dropped, monotime in the past\n",
7205  GNUNET_i2s (&b->pid));
7207  GST_stats,
7208  "# Backchannel messages dropped: monotonic time not increasing",
7209  1,
7210  GNUNET_NO);
7211  b->monotonic_time = mt;
7212  /* Setting body_size to 0 prevents call to #forward_backchannel_payload()
7213  */
7214  b->body_size = 0;
7215  return;
7216  }
7217 }
7218 
7219 
7227 static void
7228 backtalker_monotime_store_cb (void *cls, int success)
7229 {
7230  struct Backtalker *b = cls;
7231 
7232  if (GNUNET_OK != success)
7233  {
7235  "Failed to store backtalker's monotonic time in PEERSTORE!\n");
7236  }
7237  b->sc = NULL;
7239 }
7240 
7241 
7247 static void
7249 {
7250  struct GNUNET_TIME_AbsoluteNBO mtbe;
7251 
7252  if (NULL != b->sc)
7253  {
7255  b->sc = NULL;
7256  }
7257  else
7258  {
7260  b->task = NULL;
7261  }
7263  b->sc =
7264  GNUNET_PEERSTORE_store (peerstore,
7265  "transport",
7266  &b->pid,
7268  &mtbe,
7269  sizeof (mtbe),
7273  b);
7274 }
7275 
7276 
7284 static void
7285 handle_dv_box (void *cls, const struct TransportDVBoxMessage *dvb)
7286 {
7287  struct CommunicatorMessageContext *cmc = cls;
7288  uint16_t size = ntohs (dvb->header.size) - sizeof (*dvb);
7289  uint16_t num_hops = ntohs (dvb->num_hops);
7290  const struct GNUNET_PeerIdentity *hops =
7291  (const struct GNUNET_PeerIdentity *) &dvb[1];
7292  const char *enc_payload = (const char *) &hops[num_hops];
7293  uint16_t enc_payload_size =
7294  size - (num_hops * sizeof (struct GNUNET_PeerIdentity));
7295  struct DVKeyState key;
7296  struct GNUNET_HashCode hmac;
7297  const char *hdr;
7298  size_t hdr_len;
7299 
7300  if (GNUNET_EXTRA_LOGGING > 0)
7301  {
7302  char *path;
7303 
7305  for (unsigned int i = 0; i < num_hops; i++)
7306  {
7307  char *tmp;
7308 
7309  GNUNET_asprintf (&tmp, "%s->%s", path, GNUNET_i2s (&hops[i]));
7310  GNUNET_free (path);
7311  path = tmp;
7312  }
7314  "Received DVBox with remainig path %s\n",
7315  path);
7316  GNUNET_free (path);
7317  }
7318 
7319  if (num_hops > 0)
7320  {
7321  /* We're trying from the end of the hops array, as we may be
7322  able to find a shortcut unknown to the origin that way */
7323  for (int i = num_hops - 1; i >= 0; i--)
7324  {
7325  struct Neighbour *n;
7326 
7327  if (0 == GNUNET_memcmp (&hops[i], &GST_my_identity))
7328  {
7329  GNUNET_break_op (0);
7330  finish_cmc_handling (cmc);
7331  return;
7332  }
7333  n = lookup_neighbour (&hops[i]);
7334  if (NULL == n)
7335  continue;
7337  "Skipping %u/%u hops ahead while routing DV Box\n",
7338  i,
7339  num_hops);
7340  forward_dv_box (n,
7341  dvb,
7342  ntohs (dvb->total_hops) + 1,
7343  num_hops - i - 1, /* number of hops left */
7344  &hops[i + 1], /* remaining hops */
7345  enc_payload,
7346  enc_payload_size);
7347  GNUNET_STATISTICS_update (GST_stats,
7348  "# DV hops skipped routing boxes",
7349  i,
7350  GNUNET_NO);
7351  GNUNET_STATISTICS_update (GST_stats,
7352  "# DV boxes routed (total)",
7353  1,
7354  GNUNET_NO);
7355  finish_cmc_handling (cmc);
7356  return;
7357  }
7358  /* Woopsie, next hop not in neighbours, drop! */
7359  GNUNET_STATISTICS_update (GST_stats,
7360  "# DV Boxes dropped: next hop unknown",
7361  1,
7362  GNUNET_NO);
7363  finish_cmc_handling (cmc);
7364  return;
7365  }
7366  /* We are the target. Unbox and handle message. */
7367  GNUNET_STATISTICS_update (GST_stats,
7368  "# DV boxes opened (ultimate target)",
7369  1,
7370  GNUNET_NO);
7371  cmc->total_hops = ntohs (dvb->total_hops);
7372 
7373  dh_key_derive_eph_pub (&dvb->ephemeral_key, &dvb->iv, &key);
7374  hdr = (const char *) &dvb[1];
7375  hdr_len = ntohs (dvb->header.size) - sizeof (*dvb);
7376  dv_hmac (&key, &hmac, hdr, hdr_len);
7377  if (0 != GNUNET_memcmp (&hmac, &dvb->hmac))
7378  {
7379  /* HMAC missmatch, disard! */
7380  GNUNET_break_op (0);
7381  finish_cmc_handling (cmc);
7382  return;
7383  }
7384  /* begin actual decryption */
7385  {
7386  struct Backtalker *b;
7387  struct GNUNET_TIME_Absolute monotime;
7388  struct TransportDVBoxPayloadP ppay;
7389  char body[hdr_len - sizeof (ppay)] GNUNET_ALIGN;
7390  const struct GNUNET_MessageHeader *mh =
7391  (const struct GNUNET_MessageHeader *) body;
7392 
7393  GNUNET_assert (hdr_len >=
7394  sizeof (ppay) + sizeof (struct GNUNET_MessageHeader));
7395  dv_decrypt (&key, &ppay, hdr, sizeof (ppay));
7396  dv_decrypt (&key, &body, &hdr[sizeof (ppay)], hdr_len - sizeof (ppay));
7397  dv_key_clean (&key);
7398  if (ntohs (mh->size) != sizeof (body))
7399  {
7400  GNUNET_break_op (0);
7401  finish_cmc_handling (cmc);
7402  return;
7403  }
7404  /* need to prevent box-in-a-box (and DV_LEARN) so check inbox type! */
7405  switch (ntohs (mh->type))
7406  {
7408  GNUNET_break_op (0);
7409  finish_cmc_handling (cmc);
7410  return;
7412  GNUNET_break_op (0);
7413  finish_cmc_handling (cmc);
7414  return;
7415  default:
7416  /* permitted, continue */
7417  break;
7418  }
7419  monotime = GNUNET_TIME_absolute_ntoh (ppay.monotonic_time);
7421  "Decrypted backtalk from %s\n",
7422  GNUNET_i2s (&ppay.sender));
7423  b = GNUNET_CONTAINER_multipeermap_get (backtalkers, &ppay.sender);
7424  if ((NULL != b) && (monotime.abs_value_us < b->monotonic_time.abs_value_us))
7425  {
7427  GST_stats,
7428  "# Backchannel messages dropped: monotonic time not increasing",
7429  1,
7430  GNUNET_NO);
7431  finish_cmc_handling (cmc);
7432  return;
7433  }
7434  if ((NULL == b) ||
7435  (0 != GNUNET_memcmp (&b->last_ephemeral, &dvb->ephemeral_key)))
7436  {
7437  /* Check signature */
7438  struct EphemeralConfirmationPS ec;
7439 
7441  ec.purpose.size = htonl (sizeof (ec));
7442  ec.target = GST_my_identity;
7443  ec.ephemeral_key = dvb->ephemeral_key;
7444  if (
7445  GNUNET_OK !=
7447  &ec.purpose,
7448  &ppay.sender_sig,
7449  &ppay.sender.public_key))
7450  {
7451  /* Signature invalid, disard! */
7452  GNUNET_break_op (0);
7453  finish_cmc_handling (cmc);
7454  return;
7455  }
7456  }
7457  /* Update sender, we now know the real origin! */
7459  "DVBox received for me from %s via %s\n",
7460  GNUNET_i2s2 (&ppay.sender),
7461  GNUNET_i2s (&cmc->im.sender));
7462  cmc->im.sender = ppay.sender;
7463 
7464  if (NULL != b)
7465  {
7466  /* update key cache and mono time */
7467  b->last_ephemeral = dvb->ephemeral_key;
7468  b->monotonic_time = monotime;
7470  b->timeout =
7472 
7473  demultiplex_with_cmc (cmc, mh);
7474  return;
7475  }
7476  /* setup data structure to cache signature AND check
7477  monotonic time with PEERSTORE before forwarding backchannel payload */
7478  b = GNUNET_malloc (sizeof (struct Backtalker) + sizeof (body));
7479  b->pid = ppay.sender;
7480  b->body_size = sizeof (body);
7481  memcpy (&b[1], body, sizeof (body));
7484  backtalkers,
7485  &b->pid,
7486  b,
7488  b->monotonic_time = monotime; /* NOTE: to be checked still! */
7489  b->cmc = cmc;
7490  b->timeout =
7493  b->get =
7494  GNUNET_PEERSTORE_iterate (peerstore,
7495  "transport",
7496  &b->pid,
7499  b);
7500  } /* end actual decryption */
7501 }
7502 
7503 
7511 static int
7513  const struct GNUNET_TRANSPORT_IncomingMessage *im)
7514 {
7515  struct TransportClient *tc = cls;
7516 
7517  if (CT_COMMUNICATOR != tc->type)
7518  {
7519  GNUNET_break (0);
7520  return GNUNET_SYSERR;
7521  }
7523  return GNUNET_OK;
7524 }
7525 
7526 
7531 {
7535  const char *address;
7536 
7541 };
7542 
7543 
7553 static int
7555  const struct GNUNET_PeerIdentity *pid,
7556  void *value)
7557 {
7558  struct CheckKnownAddressContext *ckac = cls;
7559  struct ValidationState *vs = value;
7560 
7561  (void) pid;
7562  if (0 != strcmp (vs->address, ckac->address))
7563  return GNUNET_OK;
7564  ckac->vs = vs;
7565  return GNUNET_NO;
7566 }
7567 
7568 
7574 static void
7575 validation_start_cb (void *cls);
7576 
7577 
7585 static void
7587  struct GNUNET_TIME_Absolute new_time)
7588 {
7589  struct GNUNET_TIME_Relative delta;
7590 
7591  if (new_time.abs_value_us == vs->next_challenge.abs_value_us)
7592  return; /* be lazy */
7593  vs->next_challenge = new_time;
7594  if (NULL == vs->hn)
7595  vs->hn =
7596  GNUNET_CONTAINER_heap_insert (validation_heap, vs, new_time.abs_value_us);
7597  else
7599  if ((vs != GNUNET_CONTAINER_heap_peek (validation_heap)) &&
7600  (NULL != validation_task))
7601  return;
7602  if (NULL != validation_task)
7603  GNUNET_SCHEDULER_cancel (validation_task);
7604  /* randomize a bit */
7605  delta.rel_value_us =
7607  MIN_DELAY_ADDRESS_VALIDATION.rel_value_us);
7608  new_time = GNUNET_TIME_absolute_add (new_time, delta);
7609  validation_task =
7610  GNUNET_SCHEDULER_add_at (new_time, &validation_start_cb, NULL);
7611 }
7612 
7613 
7620 static void
7622  const char *address)
7623 {
7624  struct GNUNET_TIME_Absolute now;
7625  struct ValidationState *vs;
7626  struct CheckKnownAddressContext ckac = {.address = address, .vs = NULL};
7627 
7628  (void) GNUNET_CONTAINER_multipeermap_get_multiple (validation_map,
7629  pid,
7631  &ckac);
7632  if (NULL != (vs = ckac.vs))
7633  {
7634  /* if 'vs' is not currently valid, we need to speed up retrying the
7635  * validation */
7636  if (vs->validated_until.abs_value_us < vs->next_challenge.abs_value_us)
7637  {
7638  /* reduce backoff as we got a fresh advertisement */
7639  vs->challenge_backoff =
7641  GNUNET_TIME_relative_divide (vs->challenge_backoff,
7642  2));
7645  vs->challenge_backoff));
7646  }
7647  return;
7648  }
7649  now = GNUNET_TIME_absolute_get ();
7650  vs = GNUNET_new (struct ValidationState);
7651  vs->pid = *pid;
7652  vs->valid_until =
7654  vs->first_challenge_use = now;
7655  vs->validation_rtt = GNUNET_TIME_UNIT_FOREVER_REL;
7657  &vs->challenge,
7658  sizeof (vs->challenge));
7659  vs->address = GNUNET_strdup (address);
7661  "Starting address validation `%s' of peer %s using challenge %s\n",
7662  address,
7663  GNUNET_i2s (pid),
7664  GNUNET_sh2s (&vs->challenge.value));
7667  validation_map,
7668  &vs->pid,
7669  vs,
7671  update_next_challenge_time (vs, now);
7672 }
7673 
7674 
7682 static void
7684  const struct GNUNET_PEERSTORE_Record *record,
7685  const char *emsg)
7686 {
7687  struct IncomingRequest *ir = cls;
7688  const char *val;
7689 
7690  if (NULL != emsg)
7691  {
7693  "Got failure from PEERSTORE: %s\n",
7694  emsg);
7695  return;
7696  }
7697  val = record->value;
7698  if ((0 == record->value_size) || ('\0' != val[record->value_size - 1]))
7699  {
7700  GNUNET_break (0);
7701  return;
7702  }
7703  start_address_validation (&ir->pid, (const char *) record->value);
7704 }
7705 
7706 
7715 static void
7717  void *cls,
7718  const struct TransportValidationChallengeMessage *tvc)
7719 {
7720  struct CommunicatorMessageContext *cmc = cls;
7722  struct VirtualLink *vl;
7723  struct GNUNET_TIME_RelativeNBO validity_duration;
7724  struct IncomingRequest *ir;
7725  struct Neighbour *n;
7726  struct GNUNET_PeerIdentity sender;
7727 
7728  /* DV-routed messages are not allowed for validation challenges */
7729  if (cmc->total_hops > 0)
7730  {
7731  GNUNET_break_op (0);
7732  finish_cmc_handling (cmc);
7733  return;
7734  }
7735  validity_duration = cmc->im.expected_address_validity;
7737  "Received address validation challenge %s\n",
7738  GNUNET_sh2s (&tvc->challenge.value));
7739  /* If we have a virtual link, we use this mechanism to signal the
7740  size of the flow control window, and to allow the sender
7741  to ask for increases. If for us the virtual link is still down,
7742  we will always give a window size of zero. */
7743  tvr.header.type =
7745  tvr.header.size = htons (sizeof (tvr));
7746  tvr.reserved = htonl (0);
7747  tvr.challenge = tvc->challenge;
7748  tvr.origin_time = tvc->sender_time;
7749  tvr.validity_duration = validity_duration;
7750  {
7751  /* create signature */
7752  struct TransportValidationPS tvp =
7754  .purpose.size = htonl (sizeof (tvp)),
7755  .validity_duration = validity_duration,
7756  .challenge = tvc->challenge};
7757 
7758  GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (GST_my_private_key,
7759  &tvp.purpose,
7760  &tvr.signature));
7761  }
7762  route_control_message_without_fc (&cmc->im.sender,
7763  &tvr.header,
7765  sender = cmc->im.sender;
7766  finish_cmc_handling (cmc);
7767  vl = lookup_virtual_link (&sender);
7768  if (NULL != vl)
7769  return;
7770 
7771  /* For us, the link is still down, but we need bi-directional
7772  connections (for flow-control and for this to be useful for
7773  CORE), so we must try to bring the link up! */
7774 
7775  /* (1) Check existing queues, if any, we may be lucky! */
7776  n = lookup_neighbour (&sender);
7777  if (NULL != n)
7778  for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
7779  start_address_validation (&sender, q->address);
7780  /* (2) Also try to see if we have addresses in PEERSTORE for this peer
7781  we could use */
7782  for (ir = ir_head; NULL != ir; ir = ir->next)
7783  if (0 == GNUNET_memcmp (&ir->pid, &sender))
7784  return; /* we are already trying */
7785  ir = GNUNET_new (struct IncomingRequest);
7786  ir->pid = sender;
7787  GNUNET_CONTAINER_DLL_insert (ir_head, ir_tail, ir);
7788  ir->wc = GNUNET_PEERSTORE_watch (peerstore,
7789  "transport",
7790  &ir->pid,
7793  ir);
7794  ir_total++;
7795  /* Bound attempts we do in parallel here, might otherwise get excessive */
7796  while (ir_total > MAX_INCOMING_REQUEST)
7797  free_incoming_request (ir_head);
7798 }
7799 
7800 
7805 {
7810 
7815 };
7816 
7817 
7827 static int
7829  const struct GNUNET_PeerIdentity *pid,
7830  void *value)
7831 {
7832  struct CheckKnownChallengeContext *ckac = cls;
7833  struct ValidationState *vs = value;
7834 
7835  (void) pid;
7836  if (0 != GNUNET_memcmp (&vs->challenge, ckac->challenge))
7837  return GNUNET_OK;
7838  ckac->vs = vs;
7839  return GNUNET_NO;
7840 }
7841 
7842 
7850 static void
7851 peerstore_store_validation_cb (void *cls, int success)
7852 {
7853  struct ValidationState *vs = cls;
7854 
7855  vs->sc = NULL;
7856  if (GNUNET_YES == success)
7857  return;
7858  GNUNET_STATISTICS_update (GST_stats,
7859  "# Peerstore failed to store foreign address",
7860  1,
7861  GNUNET_NO);
7862 }
7863 
7864 
7872 static struct Queue *
7873 find_queue (const struct GNUNET_PeerIdentity *pid, const char *address)
7874 {
7875  struct Neighbour *n;
7876 
7877  n = lookup_neighbour (pid);
7878  if (NULL == n)
7879  return NULL;
7880  for (struct Queue *pos = n->queue_head; NULL != pos;
7881  pos = pos->next_neighbour)
7882  {
7883  if (0 == strcmp (pos->address, address))
7884  return pos;
7885  }
7886  return NULL;
7887 }
7888 
7889 
7898 static void
7900  void *cls,
7901  const struct TransportValidationResponseMessage *tvr)
7902 {
7903  struct CommunicatorMessageContext *cmc = cls;
7904  struct ValidationSt