GNUnet 0.22.2
gnunet-service-transport.c
Go to the documentation of this file.
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
75#include "platform.h"
76#include "gnunet_util_lib.h"
80#include "gnunet_nat_service.h"
82#include "gnunet_signatures.h"
83#include "transport.h"
84
88#define RING_BUFFER_SIZE 16
89
93#define MAX_FC_RETRANSMIT_COUNT 1000
94
99#define MAX_CUMMULATIVE_ACKS 64
100
113#define FC_NO_CHANGE_REPLY_PROBABILITY 8
114
119#define IN_PACKET_SIZE_WITHOUT_MTU 128
120
125#define GOODPUT_AGING_SLOTS 4
126
131#define DEFAULT_WINDOW_SIZE (128 * 1024)
132
141#define MAX_INCOMING_REQUEST 16
142
147#define MAX_DV_DISCOVERY_SELECTION 16
148
157#define RECV_WINDOW_SIZE 4
158
166#define MIN_DV_PATH_LENGTH_FOR_INITIATOR 3
167
171#define MAX_DV_HOPS_ALLOWED 16
172
177#define MAX_DV_LEARN_PENDING 64
178
182#define MAX_DV_PATHS_TO_TARGET 3
183
189#define DELAY_WARN_THRESHOLD \
190 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
191
196#define DV_FORWARD_TIMEOUT \
197 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
198
202#define DEFAULT_ACK_WAIT_DURATION \
203 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
204
210#define DV_QUALITY_RTT_THRESHOLD \
211 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
212
217#define DV_PATH_VALIDITY_TIMEOUT \
218 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
219
224#define BACKCHANNEL_INACTIVITY_TIMEOUT \
225 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
226
231#define DV_PATH_DISCOVERY_FREQUENCY \
232 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
233
237#define EPHEMERAL_VALIDITY \
238 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
239
243#define REASSEMBLY_EXPIRATION \
244 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
245
250#define FAST_VALIDATION_CHALLENGE_FREQ \
251 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
252
256#define MAX_VALIDATION_CHALLENGE_FREQ \
257 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
258
264#define ACK_CUMMULATOR_TIMEOUT \
265 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
266
271#define DV_LEARN_BASE_FREQUENCY GNUNET_TIME_UNIT_MINUTES
272
277#define DV_LEARN_QUALITY_THRESHOLD 100
278
282#define MAX_ADDRESS_VALID_UNTIL \
283 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MONTHS, 1)
284
288#define ADDRESS_VALIDATION_LIFETIME \
289 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
290
297#define MIN_DELAY_ADDRESS_VALIDATION GNUNET_TIME_UNIT_MILLISECONDS
298
305#define VALIDATION_RTT_BUFFER_FACTOR 3
306
313#define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
314
320#define QUEUE_LENGTH_LIMIT 32
321
325#define QUEUE_ENTRY_TIMEOUT \
326 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
327
332#define RTT_DIFF \
333 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
334
336
341{
347};
348
349
354{
359};
360
365{
370
371 /* Followed by *another* message header which is the message to
372 the communicator */
373
374 /* Followed by a 0-terminated name of the communicator */
375};
376
377
382{
387
403
408
414};
415
416
422{
427
433
445
446 /* Followed by a `struct GNUNET_MessageHeader` with a message
447 for the target peer */
448};
449
450
456{
461
469
476};
477
478
483{
491
496};
497
498
507{
512
518
519 /* followed by any number of `struct TransportCummulativeAckPayloadP`
520 messages providing ACKs */
521};
522
523
528{
533
538
543
552
558};
559
560
579{
584
598
603};
604
605
623{
628
633
638
643};
644
645
651{
656
662};
663
664
679{
684
690
700
707
721
727
732
737
738 /* Followed by @e num_hops `struct DVPathEntryP` values,
739 excluding the initiator of the DV trace; the last entry is the
740 current sender; the current peer must not be included. */
741};
742
743
767{
772
776 unsigned int without_fc;
777
785
792
798
805
812
819
820 /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
821 excluding the @e origin and the current peer, the last must be
822 the ultimate target; if @e num_hops is zero, the receiver of this
823 message is the ultimate target. */
824
825 /* Followed by encrypted, variable-size payload, which
826 must begin with a `struct TransportDVBoxPayloadP` */
827
828 /* Followed by the actual message, which itself must not be a
829 a DV_LEARN or DV_BOX message! */
830};
831
832
838{
843
848
853
859};
860
861
867{
872
878
883};
884
885
891{
896
901
907
912
917 struct GNUNET_TIME_AbsoluteNBO origin_time;
918
923 struct GNUNET_TIME_RelativeNBO validity_duration;
924};
925
927{
931 unsigned int address_length;
932
933 /* Followed by @e address_length bytes of the address. */
934};
935
945{
950
959
965
972
982
992
997
1001 unsigned int sync_ready;
1002
1007
1012
1013 /* Followed by @e number_of_addresses struct TransportGlobalNattedAddress. */
1014};
1015
1017
1018
1023{
1028
1033
1038
1043
1047 CT_APPLICATION = 4
1049
1050
1056{
1061
1066
1071
1076
1082 RMO_REDUNDANT = 4
1084
1085
1090{
1095
1100
1105
1111};
1112
1113
1119{
1123 uint64_t bytes_sent;
1124
1130};
1131
1132
1137{
1142
1148
1153 unsigned int last_age;
1154};
1155
1156
1160struct TransportClient;
1161
1165struct Neighbour;
1166
1171struct DistanceVector;
1172
1177struct Queue;
1178
1182struct PendingMessage;
1183
1187struct DistanceVectorHop;
1188
1197struct VirtualLink;
1198
1199
1205{
1211
1217
1222
1227
1232
1237 uint16_t total_hops;
1238
1242 unsigned int continue_send;
1243};
1244
1245
1250{
1255
1260};
1261
1262
1267{
1272
1277
1282
1286 uint16_t size;
1287
1294 uint16_t isize;
1295};
1296
1297
1302{
1308
1313
1318
1326 uint8_t *bitfield;
1327
1332
1338
1342 uint16_t msg_size;
1343
1348 uint16_t msg_missing;
1349
1350 /* Followed by @e msg_size bytes of the (partially) defragmented original
1351 * message */
1352
1353 /* Followed by @e bitfield data */
1354};
1355
1356
1366{
1371
1378
1385
1390
1396
1402
1407
1412
1417
1422
1430
1436
1441
1446
1451
1456 unsigned int confirmed;
1457
1461 struct Neighbour *n;
1462
1467
1474
1481
1490
1496
1502
1507
1513
1522
1530
1537
1546
1559
1565
1572
1583
1588 uint32_t fc_seq_gen;
1589
1595 uint32_t last_fc_seq;
1596
1609
1614};
1615
1616
1621{
1627
1633
1640
1647
1654
1661
1668
1675
1680
1686
1692
1697 struct Queue *queue;
1698
1703
1708
1712 unsigned int num_send;
1713};
1714
1715
1720{
1725
1730
1735
1740
1745
1750
1755
1760
1767
1773
1782
1787
1793 unsigned int distance;
1794};
1795
1796
1802{
1807
1812
1817
1822
1828
1834
1839
1844
1849
1854};
1855
1856
1867{
1872
1877
1881 struct Queue *queue;
1882
1887
1891 uint64_t mid;
1892
1897};
1898
1899
1904struct Queue
1905{
1910
1915
1920
1925
1930
1935
1940
1945
1950
1955
1959 const char *address;
1960
1964 unsigned int unlimited_length;
1965
1971
1980
1985
1991
1996 uint64_t mid_gen;
1997
2001 uint32_t qid;
2002
2006 uint32_t mtu;
2007
2012
2017
2021 unsigned int queue_length;
2022
2026 uint64_t q_capacity;
2027
2031 uint32_t priority;
2032
2037
2042
2047 int idle;
2048
2053};
2054
2055
2060{
2065
2071
2077
2082
2087
2093
2099
2105
2111
2117
2122
2127
2132
2137};
2138
2139
2145{
2150
2155
2160
2165};
2166
2167
2171struct PeerRequest
2172{
2177
2182
2187
2194
2199};
2200
2201
2206{
2211
2216
2221
2225 PMT_DV_BOX = 3
2227
2228
2255struct PendingMessage
2256{
2261
2266
2271
2276
2282
2288
2293
2298
2304
2310
2315
2325
2330
2335
2340
2345
2350
2355
2361
2367
2372
2378
2383
2387 uint16_t bytes_msg;
2388
2392 uint16_t frag_off;
2393
2398
2403
2407 uint16_t frag_count;
2408
2413
2414 /* Followed by @e bytes_msg to transmit */
2415};
2416
2417
2422{
2428
2433};
2434
2435
2441{
2446
2451
2458
2463
2469 uint32_t ack_counter;
2470
2474 unsigned int num_acks;
2475};
2476
2477
2482{
2487
2492
2497
2502
2506 const char *address;
2507
2512
2517
2522
2527
2533
2537 uint32_t aid;
2538
2543};
2544
2545
2550{
2555
2560
2565
2570
2575
2576 union
2577 {
2581 struct
2582 {
2588
2594
2598 struct
2599 {
2606
2612
2613
2617 struct
2618 {
2624
2629
2634
2640
2646
2653
2658
2663
2668
2670
2674 struct
2675 {
2683};
2684
2685
2691{
2697
2705
2711
2718 struct GNUNET_TIME_Absolute first_challenge_use;
2719
2726 struct GNUNET_TIME_Absolute last_challenge_use;
2727
2735 struct GNUNET_TIME_Absolute next_challenge;
2736
2745 struct GNUNET_TIME_Relative challenge_backoff;
2746
2751 struct GNUNET_TIME_Relative validation_rtt;
2752
2760 struct GNUNET_CRYPTO_ChallengeNonceP challenge;
2761
2765 struct GNUNET_HashCode hc;
2766
2770 struct GNUNET_SCHEDULER_Task *revalidation_task;
2771
2775 char *address;
2776
2782 struct GNUNET_CONTAINER_HeapNode *hn;
2783
2789
2795 uint32_t last_window_consum_limit;
2796
2801 int awaiting_queue;
2802};
2803
2804
2812{
2817
2822
2827
2832
2838
2843
2849
2855
2861};
2862
2867
2871static unsigned int ring_buffer_head;
2872
2876static unsigned int is_ring_buffer_full;
2877
2882
2886static unsigned int ring_buffer_dv_head;
2887
2891static unsigned int is_ring_buffer_dv_full;
2892
2897
2902
2907
2912
2917
2922
2927
2933
2939
2945
2951
2957
2963
2969
2975
2980
2984static struct LearnLaunchEntry *lle_head = NULL;
2985
2989static struct LearnLaunchEntry *lle_tail = NULL;
2990
2997
3002
3007
3012
3017
3024
3029
3033static unsigned int ir_total;
3034
3038static unsigned long long logging_uuid_gen;
3039
3044
3054
3059static int in_shutdown;
3060
3065
3067
3069
3080static unsigned int
3082{
3083 struct GNUNET_TIME_Absolute now;
3084
3085 now = GNUNET_TIME_absolute_get ();
3086 return now.abs_value_us / GNUNET_TIME_UNIT_MINUTES.rel_value_us / 15;
3087}
3088
3089
3095static void
3097{
3099 GNUNET_assert (ir_total > 0);
3100 ir_total--;
3101 if (NULL != ir->nc)
3103 ir->nc = NULL;
3104 GNUNET_free (ir);
3105}
3106
3107
3113static void
3115{
3116 struct Queue *q = pa->queue;
3117 struct PendingMessage *pm = pa->pm;
3118 struct DistanceVectorHop *dvh = pa->dvh;
3119
3121 "free_pending_acknowledgement\n");
3122 if (NULL != q)
3123 {
3124 GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
3125 pa->queue = NULL;
3126 }
3127 if (NULL != pm)
3128 {
3130 "remove pa from message\n");
3132 "remove pa from message %" PRIu64 "\n",
3133 pm->logging_uuid);
3135 "remove pa from message %u\n",
3136 pm->pmt);
3138 "remove pa from message %s\n",
3140 GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
3141 pa->pm = NULL;
3142 }
3143 if (NULL != dvh)
3144 {
3145 GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
3146 pa->queue = NULL;
3147 }
3150 &pa->ack_uuid.value,
3151 pa));
3152 GNUNET_free (pa);
3153}
3154
3155
3164static void
3166{
3167 struct PendingMessage *frag;
3168
3169 while (NULL != (frag = root->head_frag))
3170 {
3171 struct PendingAcknowledgement *pa;
3172
3173 free_fragment_tree (frag);
3174 while (NULL != (pa = frag->pa_head))
3175 {
3176 GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
3177 pa->pm = NULL;
3178 }
3179 GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
3180 if (NULL != frag->qe)
3181 {
3182 GNUNET_assert (frag == frag->qe->pm);
3183 frag->qe->pm = NULL;
3184 }
3186 "Free frag %p\n",
3187 frag);
3188 GNUNET_free (frag);
3189 }
3190}
3191
3192
3200static void
3202{
3203 struct TransportClient *tc = pm->client;
3204 struct VirtualLink *vl = pm->vl;
3205 struct PendingAcknowledgement *pa;
3206
3208 "Freeing pm %p\n",
3209 pm);
3210 if (NULL != tc)
3211 {
3213 tc->details.core.pending_msg_head,
3214 tc->details.core.pending_msg_tail,
3215 pm);
3216 }
3217 if ((NULL != vl) && (NULL == pm->frag_parent))
3218 {
3220 "Removing pm %" PRIu64 "\n",
3221 pm->logging_uuid);
3223 vl->pending_msg_head,
3224 vl->pending_msg_tail,
3225 pm);
3226 }
3227 else if (NULL != pm->frag_parent && PMT_DV_BOX != pm->pmt)
3228 {
3229 struct PendingMessage *root = pm->frag_parent;
3230
3231 while (NULL != root->frag_parent && PMT_DV_BOX != root->pmt)
3232 root = root->frag_parent;
3233
3234 root->frag_count--;
3235 }
3236 while (NULL != (pa = pm->pa_head))
3237 {
3238 if (NULL == pa)
3240 "free pending pa null\n");
3241 if (NULL == pm->pa_tail)
3243 "free pending pa_tail null\n");
3244 if (NULL == pa->prev_pa)
3246 "free pending pa prev null\n");
3247 if (NULL == pa->next_pa)
3249 "free pending pa next null\n");
3250 GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
3251 pa->pm = NULL;
3252 }
3253
3255 if (NULL != pm->qe)
3256 {
3257 GNUNET_assert (pm == pm->qe->pm);
3258 pm->qe->pm = NULL;
3259 }
3260 if (NULL != pm->bpm)
3261 {
3262 free_fragment_tree (pm->bpm);
3263 if (NULL != pm->bpm->qe)
3264 {
3265 struct QueueEntry *qe = pm->bpm->qe;
3266
3267 qe->pm = NULL;
3268 }
3269 GNUNET_free (pm->bpm);
3270 }
3271
3272 GNUNET_free (pm);
3274 "Freeing pm done\n");
3275}
3276
3277
3283static void
3285{
3286 struct VirtualLink *vl = rc->virtual_link;
3287
3291 rc->msg_uuid.uuid,
3292 rc));
3293 GNUNET_free (rc);
3294}
3295
3296
3302static void
3304{
3305 struct VirtualLink *vl = cls;
3306 struct ReassemblyContext *rc;
3307
3308 vl->reassembly_timeout_task = NULL;
3309 while (NULL != (rc = GNUNET_CONTAINER_heap_peek (vl->reassembly_heap)))
3310 {
3312 .rel_value_us)
3313 {
3315 continue;
3316 }
3321 vl);
3322 return;
3323 }
3324}
3325
3326
3335static int
3336free_reassembly_cb (void *cls, uint32_t key, void *value)
3337{
3338 struct ReassemblyContext *rc = value;
3339
3340 (void) cls;
3341 (void) key;
3343 return GNUNET_OK;
3344}
3345
3346
3352static void
3354{
3355 struct PendingMessage *pm;
3356 struct CoreSentContext *csc;
3357
3359 "free virtual link %p\n",
3360 vl);
3361
3362 if (NULL != vl->reassembly_map)
3363 {
3366 NULL);
3368 vl->reassembly_map = NULL;
3370 vl->reassembly_heap = NULL;
3371 }
3372 if (NULL != vl->reassembly_timeout_task)
3373 {
3376 }
3377 while (NULL != (pm = vl->pending_msg_head))
3381 if (NULL != vl->visibility_task)
3382 {
3384 vl->visibility_task = NULL;
3385 }
3386 if (NULL != vl->fc_retransmit_task)
3387 {
3389 vl->fc_retransmit_task = NULL;
3390 }
3391 while (NULL != (csc = vl->csc_head))
3392 {
3394 GNUNET_assert (vl == csc->vl);
3395 csc->vl = NULL;
3396 }
3397 GNUNET_break (NULL == vl->n);
3398 GNUNET_break (NULL == vl->dv);
3399 GNUNET_free (vl);
3400}
3401
3402
3408static void
3410{
3411 if (NULL != vs->revalidation_task)
3412 {
3413 GNUNET_SCHEDULER_cancel (vs->revalidation_task);
3414 vs->revalidation_task = NULL;
3415 }
3416 /*memcpy (&hkey,
3417 &hc,
3418 sizeof (hkey));*/
3420 "Remove key %s for address %s map size %u contains %u during freeing state\n",
3421 GNUNET_h2s (&vs->hc),
3422 vs->address,
3425 &vs->hc));
3428 GNUNET_YES ==
3431 vs->hn = NULL;
3432 if (NULL != vs->sc)
3433 {
3435 "store cancel\n");
3437 vs->sc = NULL;
3438 }
3439 GNUNET_free (vs->address);
3440 GNUNET_free (vs);
3441}
3442
3443
3450static struct Neighbour *
3452{
3454}
3455
3456
3463static struct VirtualLink *
3465{
3467}
3468
3469
3474{
3481
3486
3491
3496
3501};
3502
3503
3512static void
3514{
3515 struct Neighbour *n = dvh->next_hop;
3516 struct DistanceVector *dv = dvh->dv;
3517 struct PendingAcknowledgement *pa;
3518
3519 while (NULL != (pa = dvh->pa_head))
3520 {
3522 pa->dvh = NULL;
3523 }
3524 GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh);
3526 GNUNET_free (dvh);
3527}
3528
3529
3536static void
3537check_link_down (void *cls);
3538
3539
3545static void
3547{
3549 "Informing CORE clients about disconnect from %s\n",
3550 GNUNET_i2s (pid));
3551 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3552 {
3553 struct GNUNET_MQ_Envelope *env;
3554 struct DisconnectInfoMessage *dim;
3555
3556 if (CT_CORE != tc->type)
3557 continue;
3559 dim->peer = *pid;
3560 GNUNET_MQ_send (tc->mq, env);
3561 }
3562}
3563
3564
3571static void
3573{
3574 struct DistanceVectorHop *dvh;
3575 struct VirtualLink *vl;
3576
3577 while (NULL != (dvh = dv->dv_head))
3579
3581 GNUNET_YES ==
3583 if (NULL != (vl = dv->vl))
3584 {
3585 GNUNET_assert (dv == vl->dv);
3586 vl->dv = NULL;
3587 if (NULL == vl->n)
3588 {
3590 free_virtual_link (vl);
3591 }
3592 else
3593 {
3596 }
3597 dv->vl = NULL;
3598 }
3599
3600 if (NULL != dv->timeout_task)
3601 {
3603 dv->timeout_task = NULL;
3604 }
3605 GNUNET_free (dv->km);
3606 GNUNET_free (dv);
3607}
3608
3609
3623static void
3625 const struct GNUNET_PeerIdentity *peer,
3626 const char *address,
3628 const struct MonitorEvent *me)
3629{
3630 struct GNUNET_MQ_Envelope *env;
3632 size_t addr_len = strlen (address) + 1;
3633
3635 addr_len,
3637 md->nt = htonl ((uint32_t) nt);
3638 md->peer = *peer;
3639 md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
3640 md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
3641 md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
3642 md->rtt = GNUNET_TIME_relative_hton (me->rtt);
3643 md->cs = htonl ((uint32_t) me->cs);
3644 md->num_msg_pending = htonl (me->num_msg_pending);
3645 md->num_bytes_pending = htonl (me->num_bytes_pending);
3646 memcpy (&md[1], address, addr_len);
3647 GNUNET_MQ_send (tc->mq, env);
3648}
3649
3650
3660static void
3662 const char *address,
3664 const struct MonitorEvent *me)
3665{
3666 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3667 {
3668 if (CT_MONITOR != tc->type)
3669 continue;
3670 if (tc->details.monitor.one_shot)
3671 continue;
3672 if ((GNUNET_NO == GNUNET_is_zero (&tc->details.monitor.peer)) &&
3673 (0 != GNUNET_memcmp (&tc->details.monitor.peer, peer)))
3674 continue;
3676 }
3677}
3678
3679
3689static void *
3691 struct GNUNET_SERVICE_Client *client,
3692 struct GNUNET_MQ_Handle *mq)
3693{
3694 struct TransportClient *tc;
3695
3696 (void) cls;
3697 tc = GNUNET_new (struct TransportClient);
3698 tc->client = client;
3699 tc->mq = mq;
3702 "Client %p of type %u connected\n",
3703 tc,
3704 tc->type);
3705 return tc;
3706}
3707
3708
3709static enum GNUNET_GenericReturnValue
3711 const struct GNUNET_PeerIdentity *pid,
3712 void *value)
3713{
3714 struct TransportGlobalNattedAddress *tgna = value;
3715 (void) cls;
3716
3717 GNUNET_free (tgna);
3718
3719 return GNUNET_OK;
3720}
3721
3722
3729static void
3730free_neighbour (struct Neighbour *neighbour,
3731 enum GNUNET_GenericReturnValue drop_link)
3732{
3733 struct DistanceVectorHop *dvh;
3734 struct VirtualLink *vl;
3735
3736 GNUNET_assert (NULL == neighbour->queue_head);
3739 &neighbour->pid,
3740 neighbour));
3742 "Freeing neighbour\n");
3745 NULL);
3747 while (NULL != (dvh = neighbour->dv_head))
3748 {
3749 struct DistanceVector *dv = dvh->dv;
3750
3752 if (NULL == dv->dv_head)
3753 free_dv_route (dv);
3754 }
3755 if (NULL != neighbour->get)
3756 {
3758 neighbour->get = NULL;
3759 }
3760 if (NULL != neighbour->sc)
3761 {
3763 "store cancel\n");
3764 GNUNET_PEERSTORE_store_cancel (neighbour->sc);
3765 neighbour->sc = NULL;
3766 }
3767 if (NULL != (vl = neighbour->vl))
3768 {
3769 GNUNET_assert (neighbour == vl->n);
3770 vl->n = NULL;
3771 if ((GNUNET_YES == drop_link) || (NULL == vl->dv))
3772 {
3775 }
3776 else
3777 {
3780 }
3781 neighbour->vl = NULL;
3782 }
3783 GNUNET_free (neighbour);
3784}
3785
3786
3793static void
3795 const struct GNUNET_PeerIdentity *pid)
3796{
3797 struct GNUNET_MQ_Envelope *env;
3798 struct ConnectInfoMessage *cim;
3799
3800 GNUNET_assert (CT_CORE == tc->type);
3802 cim->id = *pid;
3803 GNUNET_MQ_send (tc->mq, env);
3804}
3805
3806
3812static void
3814{
3816 "Informing CORE clients about connection to %s\n",
3817 GNUNET_i2s (pid));
3818 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3819 {
3820 if (CT_CORE != tc->type)
3821 continue;
3823 }
3824}
3825
3826
3834static void
3835transmit_on_queue (void *cls);
3836
3837
3841static unsigned int
3843{
3844 for (struct Queue *s = queue_head; NULL != s;
3845 s = s->next_client)
3846 {
3847 if (s->tc->details.communicator.address_prefix !=
3848 queue->tc->details.communicator.address_prefix)
3849 {
3851 "queue address %s qid %u compare with queue: address %s qid %u\n",
3852 queue->address,
3853 queue->qid,
3854 s->address,
3855 s->qid);
3856 if ((s->priority > queue->priority) && (0 < s->q_capacity) &&
3857 (QUEUE_LENGTH_LIMIT > s->queue_length) )
3858 return GNUNET_YES;
3860 "Lower prio\n");
3861 }
3862 }
3863 return GNUNET_NO;
3864}
3865
3866
3874static void
3876 struct Queue *queue,
3878{
3880
3881 if (queue->validated_until.abs_value_us < now.abs_value_us)
3882 return;
3884 queue->tc->details.communicator.
3885 queue_head))
3886 return;
3887
3888 if (queue->tc->details.communicator.total_queue_length >=
3890 {
3892 "Transmission on queue %s (QID %u) throttled due to communicator queue limit\n",
3893 queue->address,
3894 queue->qid);
3896 GST_stats,
3897 "# Transmission throttled due to communicator queue limit",
3898 1,
3899 GNUNET_NO);
3900 queue->idle = GNUNET_NO;
3901 return;
3902 }
3903 if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
3904 {
3906 "Transmission on queue %s (QID %u) throttled due to communicator queue length limit\n",
3907 queue->address,
3908 queue->qid);
3910 "# Transmission throttled due to queue queue limit",
3911 1,
3912 GNUNET_NO);
3913 queue->idle = GNUNET_NO;
3914 return;
3915 }
3916 if (0 == queue->q_capacity)
3917 {
3919 "Transmission on queue %s (QID %u) throttled due to communicator message has capacity %"
3920 PRIu64 ".\n",
3921 queue->address,
3922 queue->qid,
3923 queue->q_capacity);
3925 "# Transmission throttled due to message queue capacity",
3926 1,
3927 GNUNET_NO);
3928 queue->idle = GNUNET_NO;
3929 return;
3930 }
3931 /* queue might indeed be ready, schedule it */
3932 if (NULL != queue->transmit_task)
3933 GNUNET_SCHEDULER_cancel (queue->transmit_task);
3934 queue->transmit_task =
3936 queue);
3938 "Considering transmission on queue `%s' QID %llu to %s\n",
3939 queue->address,
3940 (unsigned long long) queue->qid,
3941 GNUNET_i2s (&queue->neighbour->pid));
3942}
3943
3944
3951static void
3953{
3954 struct VirtualLink *vl = cls;
3955 struct DistanceVector *dv = vl->dv;
3956 struct Neighbour *n = vl->n;
3957 struct GNUNET_TIME_Absolute dvh_timeout;
3958 struct GNUNET_TIME_Absolute q_timeout;
3959
3961 "Checking if link is down\n");
3962 vl->visibility_task = NULL;
3963 dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3964 if (NULL != dv)
3965 {
3966 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
3967 pos = pos->next_dv)
3968 dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout,
3969 pos->path_valid_until);
3970 if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
3971 {
3972 vl->dv->vl = NULL;
3973 vl->dv = NULL;
3974 }
3975 }
3976 q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3977 for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
3978 q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
3979 if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
3980 {
3981 vl->n->vl = NULL;
3982 vl->n = NULL;
3983 }
3984 if ((NULL == vl->n) && (NULL == vl->dv))
3985 {
3987 free_virtual_link (vl);
3988 return;
3989 }
3990 vl->visibility_task =
3991 GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout),
3993 vl);
3994}
3995
3996
4002static void
4004{
4005 struct Neighbour *neighbour = queue->neighbour;
4006 struct TransportClient *tc = queue->tc;
4007 struct MonitorEvent me = { .cs = GNUNET_TRANSPORT_CS_DOWN,
4009 struct QueueEntry *qe;
4010 int maxxed;
4011 struct PendingAcknowledgement *pa;
4012 struct VirtualLink *vl;
4013
4015 "Cleaning up queue %u\n", queue->qid);
4016 if (NULL != queue->mo)
4017 {
4019 queue->mo = NULL;
4020 }
4021 if (NULL != queue->transmit_task)
4022 {
4023 GNUNET_SCHEDULER_cancel (queue->transmit_task);
4024 queue->transmit_task = NULL;
4025 }
4026 while (NULL != (pa = queue->pa_head))
4027 {
4028 GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
4029 pa->queue = NULL;
4030 }
4031
4033 neighbour->queue_head,
4034 neighbour->queue_tail,
4035 queue);
4037 tc->details.communicator.queue_head,
4038 tc->details.communicator.queue_tail,
4039 queue);
4041 tc->details.communicator.total_queue_length);
4043 "Cleaning up queue with length %u\n",
4044 queue->queue_length);
4045 while (NULL != (qe = queue->queue_head))
4046 {
4047 GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe);
4048 queue->queue_length--;
4049 tc->details.communicator.total_queue_length--;
4050 if (NULL != qe->pm)
4051 {
4052 GNUNET_assert (qe == qe->pm->qe);
4053 qe->pm->qe = NULL;
4054 }
4055 GNUNET_free (qe);
4056 }
4057 GNUNET_assert (0 == queue->queue_length);
4058 if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT >
4059 tc->details.communicator.total_queue_length))
4060 {
4061 /* Communicator dropped below threshold, resume all _other_ queues */
4063 GST_stats,
4064 "# Transmission throttled due to communicator queue limit",
4065 -1,
4066 GNUNET_NO);
4067 for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
4068 s = s->next_client)
4070 s,
4072 }
4073 notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
4075
4076 vl = lookup_virtual_link (&neighbour->pid);
4077 if ((NULL != vl) && (neighbour == vl->n))
4078 {
4080 check_link_down (vl);
4081 }
4082 if (NULL == neighbour->queue_head)
4083 {
4084 free_neighbour (neighbour, GNUNET_NO);
4085 }
4086}
4087
4088
4094static void
4096{
4097 struct TransportClient *tc = ale->tc;
4098
4099 GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
4100 tc->details.communicator.addr_tail,
4101 ale);
4102 if (NULL != ale->sc)
4103 {
4105 "store cancel\n");
4107 ale->sc = NULL;
4108 }
4109 if (NULL != ale->st)
4110 {
4112 ale->st = NULL;
4113 }
4114 if (NULL != ale->signed_address)
4116 GNUNET_free (ale);
4117}
4118
4119
4128static int
4130 const struct GNUNET_PeerIdentity *pid,
4131 void *value)
4132{
4133 struct TransportClient *tc = cls;
4134 struct PeerRequest *pr = value;
4135
4136 if (NULL != pr->nc)
4138 pr->nc = NULL;
4140 GNUNET_YES ==
4141 GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
4142 pid,
4143 pr));
4144 GNUNET_free (pr);
4145
4146 return GNUNET_OK;
4147}
4148
4149
4150static void
4151do_shutdown (void *cls);
4152
4161static void
4163 struct GNUNET_SERVICE_Client *client,
4164 void *app_ctx)
4165{
4166 struct TransportClient *tc = app_ctx;
4167
4168 (void) cls;
4169 (void) client;
4171 switch (tc->type)
4172 {
4173 case CT_NONE:
4175 "Unknown Client %p disconnected, cleaning up.\n",
4176 tc);
4177 break;
4178
4179 case CT_CORE: {
4180 struct PendingMessage *pm;
4182 "CORE Client %p disconnected, cleaning up.\n",
4183 tc);
4184
4185
4186 while (NULL != (pm = tc->details.core.pending_msg_head))
4187 {
4189 tc->details.core.pending_msg_head,
4190 tc->details.core.pending_msg_tail,
4191 pm);
4192 pm->client = NULL;
4193 }
4194 }
4195 break;
4196
4197 case CT_MONITOR:
4199 "MONITOR Client %p disconnected, cleaning up.\n",
4200 tc);
4201
4202 break;
4203
4204 case CT_COMMUNICATOR: {
4205 struct Queue *q;
4206 struct AddressListEntry *ale;
4207
4209 "COMMUNICATOR Client %p disconnected, cleaning up.\n",
4210 tc);
4211
4212 if (NULL != tc->details.communicator.free_queue_entry_task)
4214 tc->details.communicator.free_queue_entry_task);
4215 while (NULL != (q = tc->details.communicator.queue_head))
4216 free_queue (q);
4217 while (NULL != (ale = tc->details.communicator.addr_head))
4219 GNUNET_free (tc->details.communicator.address_prefix);
4220 }
4221 break;
4222
4223 case CT_APPLICATION:
4225 "APPLICATION Client %p disconnected, cleaning up.\n",
4226 tc);
4227
4228 GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
4230 tc);
4231 GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests);
4232 break;
4233 }
4234 GNUNET_free (tc);
4235 if ((GNUNET_YES == in_shutdown) && (NULL == clients_head))
4236 {
4238 "Our last client disconnected\n");
4239 do_shutdown (cls);
4240 }
4241}
4242
4243
4253static int
4255 const struct GNUNET_PeerIdentity *pid,
4256 void *value)
4257{
4258 struct TransportClient *tc = cls;
4259 struct VirtualLink *vl = value;
4260
4261 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
4262 return GNUNET_OK;
4263
4265 "Telling new CORE client about existing connection to %s\n",
4266 GNUNET_i2s (pid));
4268 return GNUNET_OK;
4269}
4270
4271
4277static void
4279 unsigned
4280 int free_cmc);
4281
4282static enum GNUNET_GenericReturnValue
4284 const struct GNUNET_PeerIdentity *pid,
4285 void *value)
4286{
4287 struct VirtualLink *vl = value;
4288 struct CommunicatorMessageContext *cmc;
4289
4290 /* resume communicators */
4291 while (NULL != (cmc = vl->cmc_tail))
4292 {
4294 if (GNUNET_NO == cmc->continue_send)
4296 }
4297 return GNUNET_OK;
4298}
4299
4300
4309static void
4310handle_client_start (void *cls, const struct StartMessage *start)
4311{
4312 struct TransportClient *tc = cls;
4313 uint32_t options;
4314
4315 options = ntohl (start->options);
4316 if ((0 != (1 & options)) &&
4317 (0 != GNUNET_memcmp (&start->self, &GST_my_identity)))
4318 {
4319 /* client thinks this is a different peer, reject */
4320 GNUNET_break (0);
4322 return;
4323 }
4324 if (CT_NONE != tc->type)
4325 {
4326 GNUNET_break (0);
4328 return;
4329 }
4330 tc->type = CT_CORE;
4332 "New CORE client with PID %s registered\n",
4333 GNUNET_i2s (&start->self));
4336 tc);
4339 NULL);
4341}
4342
4343
4350static int
4351check_client_send (void *cls, const struct OutboundMessage *obm)
4352{
4353 struct TransportClient *tc = cls;
4354 uint16_t size;
4355 const struct GNUNET_MessageHeader *obmm;
4356
4357 if (CT_CORE != tc->type)
4358 {
4359 GNUNET_break (0);
4360 return GNUNET_SYSERR;
4361 }
4362 size = ntohs (obm->header.size) - sizeof(struct OutboundMessage);
4363 if (size < sizeof(struct GNUNET_MessageHeader))
4364 {
4365 GNUNET_break (0);
4366 return GNUNET_SYSERR;
4367 }
4368 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
4369 if (size != ntohs (obmm->size))
4370 {
4371 GNUNET_break (0);
4372 return GNUNET_SYSERR;
4373 }
4374 return GNUNET_OK;
4375}
4376
4377
4385static void
4387{
4388 struct TransportClient *tc = pm->client;
4389 struct VirtualLink *vl = pm->vl;
4390
4392 "client send response\n");
4393 if (NULL != tc)
4394 {
4395 struct GNUNET_MQ_Envelope *env;
4396 struct SendOkMessage *so_msg;
4397
4399 so_msg->peer = vl->target;
4401 "Confirming transmission of <%" PRIu64 "> to %s\n",
4402 pm->logging_uuid,
4403 GNUNET_i2s (&vl->target));
4404 GNUNET_MQ_send (tc->mq, env);
4405 }
4407}
4408
4409
4419static unsigned int
4422 struct DistanceVectorHop **hops_array,
4423 unsigned int hops_array_length)
4424{
4425 uint64_t choices[hops_array_length];
4426 uint64_t num_dv;
4427 unsigned int dv_count;
4428
4429 /* Pick random vectors, but weighted by distance, giving more weight
4430 to shorter vectors */
4431 num_dv = 0;
4432 dv_count = 0;
4433 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4434 pos = pos->next_dv)
4435 {
4436 if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
4437 (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
4438 .rel_value_us == 0))
4439 continue; /* pos unconfirmed and confirmed required */
4440 num_dv += MAX_DV_HOPS_ALLOWED - pos->distance;
4441 dv_count++;
4442 }
4443 if (0 == dv_count)
4444 return 0;
4445 if (dv_count <= hops_array_length)
4446 {
4447 dv_count = 0;
4448 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4449 pos = pos->next_dv)
4450 hops_array[dv_count++] = pos;
4451 return dv_count;
4452 }
4453 for (unsigned int i = 0; i < hops_array_length; i++)
4454 {
4455 int ok = GNUNET_NO;
4456 while (GNUNET_NO == ok)
4457 {
4458 choices[i] =
4460 ok = GNUNET_YES;
4461 for (unsigned int j = 0; j < i; j++)
4462 if (choices[i] == choices[j])
4463 {
4464 ok = GNUNET_NO;
4465 break;
4466 }
4467 }
4468 }
4469 dv_count = 0;
4470 num_dv = 0;
4471 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4472 pos = pos->next_dv)
4473 {
4474 uint32_t delta = MAX_DV_HOPS_ALLOWED - pos->distance;
4475
4476 if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
4477 (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
4478 .rel_value_us == 0))
4479 continue; /* pos unconfirmed and confirmed required */
4480 for (unsigned int i = 0; i < hops_array_length; i++)
4481 if ((num_dv <= choices[i]) && (num_dv + delta > choices[i]))
4482 hops_array[dv_count++] = pos;
4483 num_dv += delta;
4484 }
4485 return dv_count;
4486}
4487
4488
4495static int
4497 void *cls,
4499{
4500 struct TransportClient *tc = cls;
4501 uint16_t size;
4502
4503 if (CT_NONE != tc->type)
4504 {
4505 GNUNET_break (0);
4506 return GNUNET_SYSERR;
4507 }
4508 tc->type = CT_COMMUNICATOR;
4509 size = ntohs (cam->header.size) - sizeof(*cam);
4510 if (0 == size)
4511 return GNUNET_OK; /* receive-only communicator */
4513 return GNUNET_OK;
4514}
4515
4516
4522static void
4524 unsigned
4525 int free_cmc)
4526{
4527 if (0 != ntohl (cmc->im.fc_on))
4528 {
4529 /* send ACK when done to communicator for flow control! */
4530 struct GNUNET_MQ_Envelope *env;
4532
4534 "Acknowledge message with flow control id %" PRIu64 "\n",
4535 cmc->im.fc_id);
4537 ack->reserved = htonl (0);
4538 ack->fc_id = cmc->im.fc_id;
4539 ack->sender = cmc->im.neighbour_sender;
4540 GNUNET_MQ_send (cmc->tc->mq, env);
4541 }
4542
4544
4545 if (GNUNET_YES == free_cmc)
4546 {
4547 GNUNET_free (cmc);
4548 }
4549}
4550
4551
4552static void
4554{
4556}
4557
4558
4568static void
4569handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
4570{
4571 struct TransportClient *tc = cls;
4572 struct VirtualLink *vl;
4573 uint32_t delta;
4574 struct CommunicatorMessageContext *cmc;
4575
4576 if (CT_CORE != tc->type)
4577 {
4578 GNUNET_break (0);
4580 return;
4581 }
4582 vl = lookup_virtual_link (&rom->peer);
4583 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
4584 {
4586 "# RECV_OK dropped: virtual link unknown",
4587 1,
4588 GNUNET_NO);
4590 return;
4591 }
4592 delta = ntohl (rom->increase_window_delta);
4593 vl->core_recv_window += delta;
4595 "CORE ack receiving message, increased CORE recv window to %d\n",
4596 vl->core_recv_window);
4598 if (vl->core_recv_window <= 0)
4599 return;
4600 /* resume communicators */
4601 while (NULL != (cmc = vl->cmc_tail))
4602 {
4604 if (GNUNET_NO == cmc->continue_send)
4606 }
4607}
4608
4609
4616static void
4618 void *cls,
4620{
4621 struct TransportClient *tc = cls;
4622 uint16_t size;
4623
4624 size = ntohs (cam->header.size) - sizeof(*cam);
4625 if (0 == size)
4626 {
4628 "Receive-only communicator connected\n");
4629 return; /* receive-only communicator */
4630 }
4631 tc->details.communicator.address_prefix =
4632 GNUNET_strdup ((const char *) &cam[1]);
4633 tc->details.communicator.cc = ntohl (cam->cc);
4634 tc->details.communicator.can_burst = ntohl (cam->can_burst);
4636 "Communicator for peer %s with prefix '%s' connected %s\n",
4638 tc->details.communicator.address_prefix,
4639 tc->details.communicator.can_burst ? "can burst" :
4640 "can not burst");
4642}
4643
4644
4652static int
4654 void *cls,
4656{
4657 const struct GNUNET_MessageHeader *inbox;
4658 const char *is;
4659 uint16_t msize;
4660 uint16_t isize;
4661
4662 (void) cls;
4663 msize = ntohs (cb->header.size) - sizeof(*cb);
4664 inbox = (const struct GNUNET_MessageHeader *) &cb[1];
4665 isize = ntohs (inbox->size);
4666 if (isize >= msize)
4667 {
4668 GNUNET_break (0);
4669 return GNUNET_SYSERR;
4670 }
4671 is = (const char *) inbox;
4672 is += isize;
4673 msize -= isize;
4674 GNUNET_assert (0 < msize);
4675 if ('\0' != is[msize - 1])
4676 {
4677 GNUNET_break (0);
4678 return GNUNET_SYSERR;
4679 }
4680 return GNUNET_OK;
4681}
4682
4683
4689static void
4691{
4692 struct EphemeralConfirmationPS ec;
4693
4695 dv->ephemeral_validity =
4698 ec.target = dv->target;
4701 ec.purpose.size = htonl (sizeof(ec));
4703 &ec,
4704 &dv->sender_sig);
4705}
4706
4707
4708static void
4710 struct TransportClient *tc);
4711
4712
4713static void
4715{
4716 struct TransportClient *tc = cls;
4718
4720 "freeing timedout queue entries\n");
4721
4722 tc->details.communicator.free_queue_entry_task = NULL;
4723 for (struct Queue *queue = tc->details.communicator.queue_head; NULL != queue;
4724 queue = queue->next_client)
4725 {
4726 struct QueueEntry *qep = queue->queue_head;
4727
4729 "checking QID %u for timedout queue entries\n",
4730 queue->qid);
4731 while (NULL != qep)
4732 {
4733 struct QueueEntry *pos = qep;
4735 pos->creation_timestamp, now);
4736 qep = qep->next;
4737
4739 "diff to now %s \n",
4742 {
4744 "Freeing timed out QueueEntry with MID %" PRIu64
4745 " and QID %u\n",
4746 pos->mid,
4747 queue->qid);
4748 free_queue_entry (pos, tc);
4749 }
4750 }
4751 }
4752}
4753
4754
4764static void
4766 struct PendingMessage *pm,
4767 const void *payload,
4768 size_t payload_size)
4769{
4770 struct Neighbour *n = queue->neighbour;
4772 struct GNUNET_MQ_Envelope *env;
4773 struct PendingAcknowledgement *pa;
4774
4775 GNUNET_log (
4777 "Queueing %u bytes of payload for transmission <%" PRIu64
4778 "> on queue %llu to %s\n",
4779 (unsigned int) payload_size,
4780 (NULL == pm) ? 0 : pm->logging_uuid,
4781 (unsigned long long) queue->qid,
4782 GNUNET_i2s (&queue->neighbour->pid));
4783 env = GNUNET_MQ_msg_extra (smt,
4784 payload_size,
4786 smt->qid = htonl (queue->qid);
4787 smt->mid = GNUNET_htonll (queue->mid_gen);
4788 smt->receiver = n->pid;
4789 memcpy (&smt[1], payload, payload_size);
4790 {
4791 /* Pass the env to the communicator of queue for transmission. */
4792 struct QueueEntry *qe;
4793
4794 qe = GNUNET_new (struct QueueEntry);
4795 qe->creation_timestamp = GNUNET_TIME_absolute_get ();
4796 qe->mid = queue->mid_gen;
4798 "Create QueueEntry with MID %" PRIu64
4799 " and QID %u and prefix %s\n",
4800 qe->mid,
4801 queue->qid,
4802 queue->tc->details.communicator.address_prefix);
4803 queue->mid_gen++;
4804 qe->queue = queue;
4805 if (NULL != pm)
4806 {
4807 qe->pm = pm;
4808 // TODO Why do we have a retransmission. When we know, make decision if we still want this.
4809 // GNUNET_assert (NULL == pm->qe);
4810 if (NULL != pm->qe)
4811 {
4813 "Retransmitting message <%" PRIu64
4814 "> remove pm from qe with MID: %llu \n",
4815 pm->logging_uuid,
4816 (unsigned long long) pm->qe->mid);
4817 pm->qe->pm = NULL;
4818 }
4819 pm->qe = qe;
4820 }
4821 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
4822 if (0 == queue->q_capacity)
4823 {
4824 // Messages without FC or fragments can get here.
4825 if (NULL != pm)
4826 {
4828 "Message %" PRIu64
4829 " (pm type %u) was not send because queue has no capacity.\n",
4830 pm->logging_uuid,
4831 pm->pmt);
4832 pm->qe = NULL;
4833 }
4834 GNUNET_free (env);
4835 GNUNET_free (qe);
4836 return;
4837 }
4838 GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe);
4839 queue->queue_length++;
4840 queue->tc->details.communicator.total_queue_length++;
4841 if (GNUNET_NO == queue->unlimited_length)
4842 queue->q_capacity--;
4844 "Queue %s with qid %u has capacity %" PRIu64 "\n",
4845 queue->address,
4846 queue->qid,
4847 queue->q_capacity);
4849 queue->tc->details.communicator.total_queue_length)
4850 queue->idle = GNUNET_NO;
4851 if (QUEUE_LENGTH_LIMIT == queue->queue_length)
4852 queue->idle = GNUNET_NO;
4853 if (0 == queue->q_capacity)
4854 queue->idle = GNUNET_NO;
4855
4856 if (GNUNET_NO == queue->idle)
4857 {
4858 struct TransportClient *tc = queue->tc;
4859
4860 if (NULL == tc->details.communicator.free_queue_entry_task)
4861 tc->details.communicator.free_queue_entry_task =
4863 &
4865 tc);
4866 }
4867 if (NULL != pm && NULL != (pa = pm->pa_head))
4868 {
4869 while (pm != pa->pm)
4870 pa = pa->next_pa;
4871 pa->num_send++;
4872 }
4873 // GNUNET_CONTAINER_multiuuidmap_get (pending_acks, &ack[i].ack_uuid.value);
4875 "Sending message MID %" PRIu64
4876 " of type %u (%u) and size %lu with MQ %p queue %s (QID %u) pending %"
4877 PRIu64 "\n",
4878 GNUNET_ntohll (smt->mid),
4879 ntohs (((const struct GNUNET_MessageHeader *) payload)->type),
4880 ntohs (smt->header.size),
4881 (unsigned long) payload_size,
4882 queue->tc->mq,
4883 queue->address,
4884 queue->qid,
4885 (NULL == pm) ? 0 : pm->logging_uuid);
4886 GNUNET_MQ_send (queue->tc->mq, env);
4887 }
4888}
4889
4890
4901static struct GNUNET_TIME_Relative
4903 const struct GNUNET_MessageHeader *hdr,
4905{
4906 struct GNUNET_TIME_Absolute now;
4907 unsigned int candidates;
4908 unsigned int sel1;
4909 unsigned int sel2;
4910 struct GNUNET_TIME_Relative rtt;
4911
4912 /* Pick one or two 'random' queues from n (under constraints of options) */
4913 now = GNUNET_TIME_absolute_get ();
4914 /* FIXME-OPTIMIZE: give queues 'weights' and pick proportional to
4915 weight in the future; weight could be assigned by observed
4916 bandwidth (note: not sure if we should do this for this type
4917 of control traffic though). */
4918 candidates = 0;
4919 for (struct Queue *pos = n->queue_head; NULL != pos;
4920 pos = pos->next_neighbour)
4921 {
4922 if ((0 != (options & RMO_UNCONFIRMED_ALLOWED)) ||
4923 (pos->validated_until.abs_value_us > now.abs_value_us))
4924 candidates++;
4925 }
4926 if (0 == candidates)
4927 {
4928 /* This can happen rarely if the last confirmed queue timed
4929 out just as we were beginning to process this message. */
4931 "Could not route message of type %u to %s: no valid queue\n",
4932 ntohs (hdr->type),
4933 GNUNET_i2s (&n->pid));
4935 "# route selection failed (all no valid queue)",
4936 1,
4937 GNUNET_NO);
4939 }
4940
4943 if (0 == (options & RMO_REDUNDANT))
4944 sel2 = candidates; /* picks none! */
4945 else
4947 candidates = 0;
4948 for (struct Queue *pos = n->queue_head; NULL != pos;
4949 pos = pos->next_neighbour)
4950 {
4951 if ((0 != (options & RMO_UNCONFIRMED_ALLOWED)) ||
4952 (pos->validated_until.abs_value_us > now.abs_value_us))
4953 {
4954 if ((sel1 == candidates) || (sel2 == candidates))
4955 {
4957 "Routing message of type %u to %s using %s (#%u)\n",
4958 ntohs (hdr->type),
4959 GNUNET_i2s (&n->pid),
4960 pos->address,
4961 (sel1 == candidates) ? 1 : 2);
4962 rtt = GNUNET_TIME_relative_min (rtt, pos->pd.aged_rtt);
4963 queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
4964 }
4965 candidates++;
4966 }
4967 }
4968 return rtt;
4969}
4970
4971
4976{
4980 gcry_cipher_hd_t cipher;
4981
4985 struct
4986 {
4991
4995 char aes_key[256 / 8];
4996
5000 char aes_ctr[128 / 8];
5002};
5003
5004
5013static void
5015 const struct GNUNET_ShortHashCode *iv,
5016 struct DVKeyState *key)
5017{
5018 /* must match what we defive from decapsulated key */
5020 GNUNET_CRYPTO_hkdf_expand (&key->material,
5021 sizeof(key->material),
5022 km,
5023 "gnunet-transport-dv-key",
5024 strlen ("gnunet-transport-dv-key")
5025 ,
5026 km,
5027 sizeof(*km),
5028 iv,
5029 sizeof(*iv),
5030 NULL));
5032 "Deriving backchannel key based on KM %s and IV %s\n",
5033 GNUNET_sh2s (km),
5034 GNUNET_sh2s (iv));
5035 GNUNET_assert (0 == gcry_cipher_open (&key->cipher,
5036 GCRY_CIPHER_AES256 /* low level: go for speed */
5037 ,
5038 GCRY_CIPHER_MODE_CTR,
5039 0 /* flags */));
5040 GNUNET_assert (0 == gcry_cipher_setkey (key->cipher,
5041 &key->material.aes_key,
5042 sizeof(key->material.aes_key)));
5043 gcry_cipher_setctr (key->cipher,
5044 &key->material.aes_ctr,
5045 sizeof(key->material.aes_ctr));
5046}
5047
5048
5058static void
5059dv_hmac (const struct DVKeyState *key,
5060 struct GNUNET_HashCode *hmac,
5061 const void *data,
5062 size_t data_size)
5063{
5064 GNUNET_CRYPTO_hmac (&key->material.hmac_key, data, data_size, hmac);
5065}
5066
5067
5077static void
5078dv_encrypt (struct DVKeyState *key, const void *in, void *dst, size_t in_size)
5079{
5080 GNUNET_assert (0 ==
5081 gcry_cipher_encrypt (key->cipher, dst, in_size, in, in_size));
5082}
5083
5084
5095static enum GNUNET_GenericReturnValue
5097 void *out,
5098 const void *ciph,
5099 size_t out_size)
5100{
5101 return (0 ==
5102 gcry_cipher_decrypt (key->cipher,
5103 out, out_size,
5104 ciph, out_size)) ? GNUNET_OK : GNUNET_SYSERR;
5105}
5106
5107
5113static void
5115{
5116 gcry_cipher_close (key->cipher);
5117 GNUNET_CRYPTO_zero_keys (&key->material, sizeof(key->material));
5118}
5119
5120
5131typedef void (*DVMessageHandler) (void *cls,
5132 struct Neighbour *next_hop,
5133 const struct GNUNET_MessageHeader *hdr,
5135
5150static struct GNUNET_TIME_Relative
5152 unsigned int num_dvhs,
5153 struct DistanceVectorHop **dvhs,
5154 const struct GNUNET_MessageHeader *hdr,
5155 DVMessageHandler use,
5156 void *use_cls,
5158 enum GNUNET_GenericReturnValue without_fc)
5159{
5160 struct TransportDVBoxMessage box_hdr;
5161 struct TransportDVBoxPayloadP payload_hdr;
5162 uint16_t enc_body_size = ntohs (hdr->size);
5163 char enc[sizeof(struct TransportDVBoxPayloadP) + enc_body_size] GNUNET_ALIGN;
5164 struct DVKeyState *key;
5165 struct GNUNET_TIME_Relative rtt;
5166 struct GNUNET_ShortHashCode km;
5167
5168 key = GNUNET_new (struct DVKeyState);
5169 /* Encrypt payload */
5171 box_hdr.total_hops = htons (0);
5172 box_hdr.without_fc = htons (without_fc);
5173 // update_ephemeral (dv);
5174 if (0 ==
5175 GNUNET_TIME_absolute_get_remaining (dv->ephemeral_validity).rel_value_us)
5176 {
5177 GNUNET_CRYPTO_eddsa_kem_encaps (&dv->target.public_key,
5178 &dv->ephemeral_key,
5179 &km);
5180 dv->km = GNUNET_new (struct GNUNET_ShortHashCode);
5181 GNUNET_memcpy (dv->km, &km, sizeof(struct GNUNET_ShortHashCode));
5182 sign_ephemeral (dv);
5183 }
5184 box_hdr.ephemeral_key = dv->ephemeral_key;
5185 payload_hdr.sender_sig = dv->sender_sig;
5186
5188 &box_hdr.iv,
5189 sizeof(box_hdr.iv));
5190 // We are creating this key, so this must work.
5191 // FIXME: Possibly also add return values here. We are processing
5192 // Input from other peers...
5193 dv_setup_key_state_from_km (dv->km, &box_hdr.iv, key);
5194 payload_hdr.sender = GST_my_identity;
5195 payload_hdr.monotonic_time = GNUNET_TIME_absolute_hton (dv->monotime);
5196 dv_encrypt (key, &payload_hdr, enc, sizeof(payload_hdr));
5197 dv_encrypt (key,
5198 hdr,
5199 &enc[sizeof(struct TransportDVBoxPayloadP)],
5200 enc_body_size);
5201 dv_hmac (key, &box_hdr.hmac, enc, sizeof(enc));
5202 dv_key_clean (key);
5204 /* For each selected path, take the pre-computed header and body
5205 and add the path in the middle of the message; then send it. */
5206 for (unsigned int i = 0; i < num_dvhs; i++)
5207 {
5208 struct DistanceVectorHop *dvh = dvhs[i];
5209 unsigned int num_hops = dvh->distance + 1;
5210 char buf[sizeof(struct TransportDVBoxMessage)
5211 + sizeof(struct GNUNET_PeerIdentity) * num_hops
5212 + sizeof(struct TransportDVBoxPayloadP)
5213 + enc_body_size] GNUNET_ALIGN;
5214 struct GNUNET_PeerIdentity *dhops;
5215
5216 box_hdr.header.size = htons (sizeof(buf));
5217 box_hdr.orig_size = htons (sizeof(buf));
5218 box_hdr.num_hops = htons (num_hops);
5219 memcpy (buf, &box_hdr, sizeof(box_hdr));
5220 dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof(box_hdr)];
5221 memcpy (dhops,
5222 dvh->path,
5223 dvh->distance * sizeof(struct GNUNET_PeerIdentity));
5224 dhops[dvh->distance] = dv->target;
5225 if (GNUNET_EXTRA_LOGGING > 0)
5226 {
5227 char *path;
5228
5230 for (unsigned int j = 0; j < num_hops; j++)
5231 {
5232 char *tmp;
5233
5234 GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[j]));
5235 GNUNET_free (path);
5236 path = tmp;
5237 }
5239 "Routing message of type %u to %s using DV (#%u/%u) via %s\n",
5240 ntohs (hdr->type),
5241 GNUNET_i2s (&dv->target),
5242 i + 1,
5243 num_dvhs,
5244 path);
5245 GNUNET_free (path);
5246 }
5247 rtt = GNUNET_TIME_relative_min (rtt, dvh->pd.aged_rtt);
5248 memcpy (&dhops[num_hops], enc, sizeof(enc));
5249 use (use_cls,
5250 dvh->next_hop,
5251 (const struct GNUNET_MessageHeader *) buf,
5252 options);
5253 GNUNET_free (key);
5254 }
5255 return rtt;
5256}
5257
5258
5268static void
5270 struct Neighbour *next_hop,
5271 const struct GNUNET_MessageHeader *hdr,
5273{
5274 (void) cls;
5275 (void) route_via_neighbour (next_hop, hdr, RMO_UNCONFIRMED_ALLOWED);
5276}
5277
5278
5290static struct GNUNET_TIME_Relative
5292// route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
5293 const struct GNUNET_MessageHeader *hdr,
5295{
5296 // struct VirtualLink *vl;
5297 struct Neighbour *n;
5298 struct DistanceVector *dv;
5299 struct GNUNET_TIME_Relative rtt1;
5300 struct GNUNET_TIME_Relative rtt2;
5301 const struct GNUNET_PeerIdentity *target = &vl->target;
5302
5304 "Trying to route message of type %u to %s without fc\n",
5305 ntohs (hdr->type),
5306 GNUNET_i2s (target));
5307
5308 // TODO Do this elsewhere. vl should be given as parameter to method.
5309 // vl = lookup_virtual_link (target);
5310 GNUNET_assert (NULL != vl && GNUNET_YES == vl->confirmed);
5311 if (NULL == vl)
5313 n = vl->n;
5314 dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
5315 if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
5316 {
5317 /* if confirmed is required, and we do not have anything
5318 confirmed, drop respective options */
5319 if (NULL == n)
5320 n = lookup_neighbour (target);
5321 if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
5323 }
5324 if ((NULL == n) && (NULL == dv))
5325 {
5327 "Cannot route message of type %u to %s: no route\n",
5328 ntohs (hdr->type),
5329 GNUNET_i2s (target));
5331 "# Messages dropped in routing: no acceptable method",
5332 1,
5333 GNUNET_NO);
5335 }
5337 "Routing message of type %u to %s with options %X\n",
5338 ntohs (hdr->type),
5339 GNUNET_i2s (target),
5340 (unsigned int) options);
5341 /* If both dv and n are possible and we must choose:
5342 flip a coin for the choice between the two; for now 50/50 */
5343 if ((NULL != n) && (NULL != dv) && (0 == (options & RMO_REDUNDANT)))
5344 {
5346 n = NULL;
5347 else
5348 dv = NULL;
5349 }
5350 if ((NULL != n) && (NULL != dv))
5351 options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's
5352 enough for redundancy, so clear the flag. */
5355 if (NULL != n)
5356 {
5358 "Try to route message of type %u to %s without fc via neighbour\n",
5359 ntohs (hdr->type),
5360 GNUNET_i2s (target));
5361 rtt1 = route_via_neighbour (n, hdr, options);
5362 }
5363 if (NULL != dv)
5364 {
5365 struct DistanceVectorHop *hops[2];
5366 unsigned int res;
5367
5369 options,
5370 hops,
5371 (0 == (options & RMO_REDUNDANT)) ? 1 : 2);
5372 if (0 == res)
5373 {
5375 "Failed to route message, could not determine DV path\n");
5376 return rtt1;
5377 }
5379 "encapsulate_for_dv 1\n");
5380 rtt2 = encapsulate_for_dv (dv,
5381 res,
5382 hops,
5383 hdr,
5385 NULL,
5387 GNUNET_YES);
5388 }
5389 return GNUNET_TIME_relative_min (rtt1, rtt2);
5390}
5391
5392
5393static void
5394consider_sending_fc (void *cls);
5395
5402static void
5404{
5405 struct VirtualLink *vl = cls;
5406 vl->fc_retransmit_task = NULL;
5407 consider_sending_fc (cls);
5408}
5409
5410
5411static char *
5412get_address_without_port (const char *address);
5413
5414
5416{
5417 size_t off;
5418 char *tgnas;
5419};
5420
5421
5422static enum GNUNET_GenericReturnValue
5424 const struct GNUNET_PeerIdentity *pid,
5425 void *value)
5426{
5427 struct AddGlobalAddressesContext *ctx = cls;
5428 struct TransportGlobalNattedAddress *tgna = value;
5429 char *addr = (char *) &tgna[1];
5430
5432 "sending address %s length %u\n",
5433 addr,
5434 ntohl (tgna->address_length));
5435 GNUNET_memcpy (&(ctx->tgnas[ctx->off]), tgna, sizeof (struct
5437 + ntohl (tgna->address_length));
5438 ctx->off += sizeof(struct TransportGlobalNattedAddress) + ntohl (tgna->
5440
5441 return GNUNET_OK;
5442}
5443
5444
5445static struct GNUNET_TIME_Relative
5446calculate_rtt (struct DistanceVector *dv);
5447
5448
5455static void
5457{
5458 struct VirtualLink *vl = cls;
5459 struct GNUNET_TIME_Absolute monotime;
5460 struct TransportFlowControlMessage *fc;
5462 struct GNUNET_TIME_Relative rtt;
5463 struct GNUNET_TIME_Relative rtt_average;
5464 struct Neighbour *n = vl->n;
5465
5466 if (NULL != n && 0 < n->number_of_addresses)
5467 {
5468 size_t addresses_size =
5469 n->number_of_addresses * sizeof (struct TransportGlobalNattedAddress) + n
5470 ->size_of_global_addresses;
5471 char *tgnas = GNUNET_malloc (addresses_size);
5473 ctx.off = 0;
5474 ctx.tgnas = tgnas;
5475
5476 fc = GNUNET_malloc (sizeof (struct TransportFlowControlMessage)
5477 + addresses_size);
5478 fc->header.size = htons (sizeof(struct TransportFlowControlMessage)
5479 + addresses_size);
5480 fc->size_of_addresses = htonl (n->size_of_global_addresses);
5481 fc->number_of_addresses = htonl (n->number_of_addresses);
5482 GNUNET_CONTAINER_multipeermap_iterate (n->natted_addresses,
5484 &ctx);
5485 GNUNET_memcpy (&fc[1], tgnas, addresses_size);
5487 }
5488 else
5489 {
5490 fc = GNUNET_malloc (sizeof (struct TransportFlowControlMessage));
5491 fc->header.size = htons (sizeof(struct TransportFlowControlMessage));
5492 }
5493
5495 /* OPTIMIZE-FC-BDP: decide sane criteria on when to do this, instead of doing
5496 it always! */
5497 /* For example, we should probably ONLY do this if a bit more than
5498 an RTT has passed, or if the window changed "significantly" since
5499 then. See vl->last_fc_rtt! NOTE: to do this properly, we also
5500 need an estimate for the bandwidth-delay-product for the entire
5501 VL, as that determines "significantly". We have the delay, but
5502 the bandwidth statistics need to be added for the VL!*/(void) duration;
5503
5504 if (NULL != vl->dv)
5505 rtt_average = calculate_rtt (vl->dv);
5506 else
5507 rtt_average = GNUNET_TIME_UNIT_FOREVER_REL;
5508 fc->rtt = GNUNET_TIME_relative_hton (rtt_average);
5510 "Sending FC seq %u to %s with new window %llu %lu %u\n",
5511 (unsigned int) vl->fc_seq_gen,
5512 GNUNET_i2s (&vl->target),
5513 (unsigned long long) vl->incoming_fc_window_size,
5514 (unsigned long) rtt_average.rel_value_us,
5515 vl->sync_ready);
5517 vl->last_fc_transmission = monotime;
5518 fc->sync_ready = vl->sync_ready;
5520 fc->seq = htonl (vl->fc_seq_gen++);
5526 fc->sender_time = GNUNET_TIME_absolute_hton (monotime);
5528 if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us)
5529 {
5532 "FC retransmission to %s failed, will retry in %s\n",
5533 GNUNET_i2s (&vl->target),
5536 }
5537 else
5538 {
5539 /* OPTIMIZE-FC-BDP: rtt is not ideal, we can do better! */
5540 vl->last_fc_rtt = rtt;
5541 }
5542 if (NULL != vl->fc_retransmit_task)
5545 {
5547 vl->fc_retransmit_count = 0;
5548 }
5549 vl->fc_retransmit_task =
5551 vl->fc_retransmit_count++;
5552 GNUNET_free (fc);
5553}
5554
5555
5572static void
5574{
5575 struct Neighbour *n = vl->n;
5576 struct DistanceVector *dv = vl->dv;
5577 struct GNUNET_TIME_Absolute now;
5578 struct VirtualLink *vl_next_hop;
5579 int elig;
5580
5582 "check_vl_transmission to target %s\n",
5583 GNUNET_i2s (&vl->target));
5584 /* Check that we have an eligible pending message!
5585 (cheaper than having #transmit_on_queue() find out!) */
5586 elig = GNUNET_NO;
5587 for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
5588 pm = pm->next_vl)
5589 {
5591 "check_vl_transmission loop\n");
5592 if (NULL != pm->qe)
5593 continue; /* not eligible, is in a queue! */
5594 if (pm->bytes_msg + vl->outbound_fc_window_size_used >
5596 {
5598 "Stalled message %" PRIu64
5599 " transmission on VL %s due to flow control: %llu < %llu\n",
5600 pm->logging_uuid,
5601 GNUNET_i2s (&vl->target),
5602 (unsigned long long) vl->outbound_fc_window_size,
5603 (unsigned long long) (pm->bytes_msg
5606 return; /* We have a message, but flow control says "nope" */
5607 }
5609 "Target window on VL %s not stalled. Scheduling transmission on queue\n",
5610 GNUNET_i2s (&vl->target));
5611 /* Notify queues at direct neighbours that we are interested */
5612 now = GNUNET_TIME_absolute_get ();
5613 if (NULL != n)
5614 {
5615 for (struct Queue *queue = n->queue_head; NULL != queue;
5616 queue = queue->next_neighbour)
5617 {
5618 if ((GNUNET_YES == queue->idle) &&
5619 (queue->validated_until.abs_value_us > now.abs_value_us))
5620 {
5622 "Direct neighbour %s not stalled\n",
5623 GNUNET_i2s (&n->pid));
5625 queue,
5627 elig = GNUNET_YES;
5628 }
5629 else
5631 "Neighbour Queue QID: %u (%u) busy or invalid\n",
5632 queue->qid,
5633 queue->idle);
5634 }
5635 }
5636 /* Notify queues via DV that we are interested */
5637 if (NULL != dv)
5638 {
5639 /* Do DV with lower scheduler priority, which effectively means that
5640 IF a neighbour exists and is available, we prefer it. */
5641 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
5642 pos = pos->next_dv)
5643 {
5644 struct Neighbour *nh_iter = pos->next_hop;
5645
5646
5647 if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
5648 continue; /* skip this one: path not validated */
5649 else
5650 {
5651 vl_next_hop = lookup_virtual_link (&nh_iter->pid);
5652 GNUNET_assert (NULL != vl_next_hop);
5653 if (pm->bytes_msg + vl_next_hop->outbound_fc_window_size_used >
5654 vl_next_hop->outbound_fc_window_size)
5655 {
5657 "Stalled message %" PRIu64
5658 " transmission on next hop %s due to flow control: %llu < %llu\n",
5659 pm->logging_uuid,
5660 GNUNET_i2s (&vl_next_hop->target),
5661 (unsigned long
5662 long) vl_next_hop->outbound_fc_window_size,
5663 (unsigned long long) (pm->bytes_msg
5664 + vl_next_hop->
5665 outbound_fc_window_size_used));
5666 consider_sending_fc (vl_next_hop);
5667 continue; /* We have a message, but flow control says "nope" for the first hop of this path */
5668 }
5669 for (struct Queue *queue = nh_iter->queue_head; NULL != queue;
5670 queue = queue->next_neighbour)
5671 if ((GNUNET_YES == queue->idle) &&
5672 (queue->validated_until.abs_value_us > now.abs_value_us))
5673 {
5675 "Next hop neighbour %s not stalled\n",
5676 GNUNET_i2s (&nh_iter->pid));
5678 queue,
5680 elig = GNUNET_YES;
5681 }
5682 else
5684 "DV Queue QID: %u (%u) busy or invalid\n",
5685 queue->qid,
5686 queue->idle);
5687 }
5688 }
5689 }
5690 if (GNUNET_YES == elig)
5692 "Eligible message %" PRIu64 " of size %u to %s: %llu/%llu\n",
5693 pm->logging_uuid,
5694 pm->bytes_msg,
5695 GNUNET_i2s (&vl->target),
5696 (unsigned long long) vl->outbound_fc_window_size,
5697 (unsigned long long) (pm->bytes_msg
5699 break;
5700 }
5701}
5702
5703
5710static void
5711handle_client_send (void *cls, const struct OutboundMessage *obm)
5712{
5713 struct TransportClient *tc = cls;
5714 struct PendingMessage *pm;
5715 const struct GNUNET_MessageHeader *obmm;
5716 uint32_t bytes_msg;
5717 struct VirtualLink *vl;
5719
5720 GNUNET_assert (CT_CORE == tc->type);
5721 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
5722 bytes_msg = ntohs (obmm->size);
5723 pp = ntohl (obm->priority);
5724 vl = lookup_virtual_link (&obm->peer);
5725 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5726 {
5728 "Don't have %s as a neighbour (anymore).\n",
5729 GNUNET_i2s (&obm->peer));
5730 /* Failure: don't have this peer as a neighbour (anymore).
5731 Might have gone down asynchronously, so this is NOT
5732 a protocol violation by CORE. Still count the event,
5733 as this should be rare. */
5736 "# messages dropped (neighbour unknown)",
5737 1,
5738 GNUNET_NO);
5739 return;
5740 }
5741
5742 pm = GNUNET_malloc (sizeof(struct PendingMessage) + bytes_msg);
5744 "1 created pm %p storing vl %p\n",
5745 pm,
5746 vl);
5747 pm->logging_uuid = logging_uuid_gen++;
5748 pm->prefs = pp;
5749 pm->client = tc;
5750 pm->vl = vl;
5751 pm->bytes_msg = bytes_msg;
5752 memcpy (&pm[1], obmm, bytes_msg);
5754 "Sending message of type %u with %u bytes as <%" PRIu64
5755 "> to %s\n",
5756 ntohs (obmm->type),
5757 bytes_msg,
5758 pm->logging_uuid,
5759 GNUNET_i2s (&obm->peer));
5761 tc->details.core.pending_msg_head,
5762 tc->details.core.pending_msg_tail,
5763 pm);
5765 vl->pending_msg_head,
5766 vl->pending_msg_tail,
5767 pm);
5770}
5771
5772
5782static void
5784 void *cls,
5786{
5787 struct Neighbour *n;
5788 struct VirtualLink *vl;
5789 struct TransportClient *tc = cls;
5790 const struct GNUNET_MessageHeader *inbox =
5791 (const struct GNUNET_MessageHeader *) &cb[1];
5792 uint16_t isize = ntohs (inbox->size);
5793 const char *is = ((const char *) &cb[1]) + isize;
5794 size_t slen = strlen (is) + 1;
5795 char
5796 mbuf[slen + isize
5797 + sizeof(struct
5801
5802 /* 0-termination of 'is' was checked already in
5803 #check_communicator_backchannel() */
5805 "Preparing backchannel transmission to %s:%s of type %u and size %u\n",
5806 GNUNET_i2s (&cb->pid),
5807 is,
5808 ntohs (inbox->type),
5809 ntohs (inbox->size));
5810 /* encapsulate and encrypt message */
5811 be->header.type =
5813 be->header.size = htons (sizeof(mbuf));
5814 memcpy (&be[1], inbox, isize);
5815 memcpy (&mbuf[sizeof(struct TransportBackchannelEncapsulationMessage)
5816 + isize],
5817 is,
5818 strlen (is) + 1);
5819 // route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED);
5820 vl = lookup_virtual_link (&cb->pid);
5821 if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
5822 {
5824 }
5825 else
5826 {
5827 /* Use route via neighbour */
5828 n = lookup_neighbour (&cb->pid);
5829 if (NULL != n)
5831 n,
5832 &be->header,
5833 RMO_NONE);
5834 }
5836}
5837
5838
5846static int
5848 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
5849{
5850 struct TransportClient *tc = cls;
5851
5852 if (CT_COMMUNICATOR != tc->type)
5853 {
5854 GNUNET_break (0);
5855 return GNUNET_SYSERR;
5856 }
5858 return GNUNET_OK;
5859}
5860
5861
5867static void
5868store_pi (void *cls);
5869
5870
5877static void
5878peerstore_store_own_cb (void *cls, int success)
5879{
5880 struct AddressListEntry *ale = cls;
5881
5882 ale->sc = NULL;
5883 if (GNUNET_YES != success)
5885 "Failed to store our own address `%s' in peerstore!\n",
5886 ale->address);
5887 else
5889 "Successfully stored our own address `%s' in peerstore!\n",
5890 ale->address);
5891 /* refresh period is 1/4 of expiration time, that should be plenty
5892 without being excessive. */
5893 ale->st =
5895 4ULL),
5896 &store_pi,
5897 ale);
5898}
5899
5900
5901static void
5902shc_cont (void *cls, int success)
5903{
5904 struct AddressListEntry *ale = cls;
5906
5909 "transport",
5912 ale->signed_address,
5913 ale->signed_address_len,
5914 expiration,
5917 ale);
5918 if (NULL == ale->sc)
5919 {
5921 "Failed to store our address `%s' with peerstore\n",
5922 ale->address);
5924 &store_pi,
5925 ale);
5926 }
5927}
5928
5929
5935static void
5936store_pi (void *cls)
5937{
5938 struct AddressListEntry *ale = cls;
5939 struct GNUNET_MQ_Envelope *env;
5940 const struct GNUNET_MessageHeader *msg;
5941 const char *dash;
5942 char *address_uri;
5944 unsigned int add_success;
5945
5946 dash = strchr (ale->address, '-');
5947 GNUNET_assert (NULL != dash);
5948 dash++;
5949 GNUNET_asprintf (&address_uri,
5950 "%s://%s",
5951 prefix,
5952 dash);
5954 ale->st = NULL;
5956 "Storing our address `%s' in peerstore until %s!\n",
5957 ale->address,
5960 address_uri);
5961 if (GNUNET_OK != add_success)
5962 {
5964 "Storing our address `%s' %s\n",
5965 address_uri,
5966 GNUNET_NO == add_success ? "not done" : "failed");
5967 GNUNET_free (address_uri);
5968 return;
5969 }
5970 else
5971 {
5972
5974 "Storing our address `%s'\n",
5975 address_uri);
5976 }
5977 // FIXME hello_mono_time used here?? What about expiration in ale?
5979 ale->nt,
5982 &ale->signed_address,
5983 &ale->signed_address_len);
5984 GNUNET_free (address_uri);
5990 "store_pi 1\n");
5992 msg,
5993 shc_cont,
5994 ale);
5995 GNUNET_free (env);
5996}
5997
5998
5999static struct AddressListEntry *
6003 const char *address,
6004 uint32_t aid,
6005 size_t slen)
6006{
6007 struct AddressListEntry *ale;
6008 char *address_without_port;
6009
6010 ale = GNUNET_malloc (sizeof(struct AddressListEntry) + slen);
6011 ale->tc = tc;
6012 ale->address = (const char *) &ale[1];
6013 ale->expiration = expiration;
6014 ale->aid = aid;
6015 ale->nt = nt;
6016 memcpy (&ale[1], address, slen);
6017 address_without_port = get_address_without_port (ale->address);
6019 "Is this %s a local address (%s)\n",
6020 address_without_port,
6021 ale->address);
6022 if (0 != strcmp ("127.0.0.1", address_without_port))
6023 ale->st = GNUNET_SCHEDULER_add_now (&store_pi, ale);
6024 GNUNET_free (address_without_port);
6025
6026 return ale;
6027}
6028
6029
6036static void
6038 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
6039{
6040 struct TransportClient *tc = cls;
6041 struct AddressListEntry *ale;
6042 size_t slen;
6043 char *address;
6044
6045 /* 0-termination of &aam[1] was checked in #check_add_address */
6047 "Communicator added address `%s'!\n",
6048 (const char *) &aam[1]);
6049 slen = ntohs (aam->header.size) - sizeof(*aam);
6050 address = GNUNET_malloc (slen);
6051 memcpy (address, &aam[1], slen);
6052 ale = create_address_entry (tc,
6054 ntohl (aam->nt),
6055 address,
6056 aam->aid,
6057 slen);
6058 GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
6059 tc->details.communicator.addr_tail,
6060 ale);
6063}
6064
6065
6072static void
6074 const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
6075{
6076 struct TransportClient *tc = cls;
6077 struct AddressListEntry *alen;
6078
6079 if (CT_COMMUNICATOR != tc->type)
6080 {
6081 GNUNET_break (0);
6083 return;
6084 }
6085 for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
6086 NULL != ale;
6087 ale = alen)
6088 {
6089 alen = ale->next;
6090 if (dam->aid != ale->aid)
6091 continue;
6092 GNUNET_assert (ale->tc == tc);
6094 "Communicator deleted address `%s'!\n",
6095 ale->address);
6098 return;
6099 }
6101 "Communicator removed address we did not even have.\n");
6103 // GNUNET_SERVICE_client_drop (tc->client);
6104}
6105
6106
6114static void
6116
6117
6125static void
6127{
6128 struct CoreSentContext *ctx = cls;
6129 struct VirtualLink *vl = ctx->vl;
6130
6131 if (NULL == vl)
6132 {
6133 /* lost the link in the meantime, ignore */
6134 GNUNET_free (ctx);
6135 return;
6136 }
6139 vl->incoming_fc_window_size_ram -= ctx->size;
6140 vl->incoming_fc_window_size_used += ctx->isize;
6142 GNUNET_free (ctx);
6143}
6144
6145
6146static void
6148 const struct GNUNET_MessageHeader *mh,
6149 struct CommunicatorMessageContext *cmc,
6150 unsigned int free_cmc)
6151{
6152 uint16_t size = ntohs (mh->size);
6153 int have_core;
6154
6155 if (vl->incoming_fc_window_size_ram > UINT_MAX - size)
6156 {
6158 "# CORE messages dropped (FC arithmetic overflow)",
6159 1,
6160 GNUNET_NO);
6162 "CORE messages of type %u with %u bytes dropped (FC arithmetic overflow)\n",
6163 (unsigned int) ntohs (mh->type),
6164 (unsigned int) ntohs (mh->size));
6165 if (GNUNET_YES == free_cmc)
6167 return;
6168 }
6170 {
6172 "# CORE messages dropped (FC window overflow)",
6173 1,
6174 GNUNET_NO);
6176 "CORE messages of type %u with %u bytes dropped (FC window overflow)\n",
6177 (unsigned int) ntohs (mh->type),
6178 (unsigned int) ntohs (mh->size));
6179 if (GNUNET_YES == free_cmc)
6181 return;
6182 }
6183
6184 /* Forward to all CORE clients */
6185 have_core = GNUNET_NO;
6186 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
6187 {
6188 struct GNUNET_MQ_Envelope *env;
6189 struct InboundMessage *im;
6190 struct CoreSentContext *ctx;
6191
6192 if (CT_CORE != tc->type)
6193 continue;
6196 ctx = GNUNET_new (struct CoreSentContext);
6197 ctx->vl = vl;
6198 ctx->size = size;
6199 ctx->isize = (GNUNET_NO == have_core) ? size : 0;
6200 have_core = GNUNET_YES;
6203 im->peer = cmc->im.sender;
6204 memcpy (&im[1], mh, size);
6205 GNUNET_MQ_send (tc->mq, env);
6207 }
6208 if (GNUNET_NO == have_core)
6209 {
6211 "Dropped message to CORE: no CORE client connected!\n");
6212 /* Nevertheless, count window as used, as it is from the
6213 perspective of the other peer! */
6215 /* TODO-M1 */
6217 "Dropped message of type %u with %u bytes to CORE: no CORE client connected!\n",
6218 (unsigned int) ntohs (mh->type),
6219 (unsigned int) ntohs (mh->size));
6220 if (GNUNET_YES == free_cmc)
6222 return;
6223 }
6225 "Delivered message from %s of type %u to CORE recv window %d\n",
6226 GNUNET_i2s (&cmc->im.sender),
6227 ntohs (mh->type),
6229 if (vl->core_recv_window > 0)
6230 {
6231 if (GNUNET_YES == free_cmc)
6233 return;
6234 }
6235 /* Wait with calling #finish_cmc_handling(cmc) until the message
6236 was processed by CORE MQs (for CORE flow control)! */
6237 if (GNUNET_YES == free_cmc)
6239}
6240
6241
6250static void
6252{
6253 struct CommunicatorMessageContext *cmc = cls;
6254 // struct CommunicatorMessageContext *cmc_copy =
6255 // GNUNET_new (struct CommunicatorMessageContext);
6256 struct GNUNET_MessageHeader *mh_copy;
6257 struct RingBufferEntry *rbe;
6258 struct VirtualLink *vl;
6259 uint16_t size = ntohs (mh->size);
6260
6262 "Handling raw message of type %u with %u bytes\n",
6263 (unsigned int) ntohs (mh->type),
6264 (unsigned int) ntohs (mh->size));
6265
6266 if ((size > UINT16_MAX - sizeof(struct InboundMessage)) ||
6267 (size < sizeof(struct GNUNET_MessageHeader)))
6268 {
6269 struct GNUNET_SERVICE_Client *client = cmc->tc->client;
6270
6271 GNUNET_break (0);
6272 finish_cmc_handling (cmc);
6274 return;
6275 }
6276 vl = lookup_virtual_link (&cmc->im.sender);
6277 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
6278 {
6279 /* FIXME: sender is giving us messages for CORE but we don't have
6280 the link up yet! I *suspect* this can happen right now (i.e.
6281 sender has verified us, but we didn't verify sender), but if
6282 we pass this on, CORE would be confused (link down, messages
6283 arrive). We should investigate more if this happens often,
6284 or in a persistent manner, and possibly do "something" about
6285 it. Thus logging as error for now. */
6286
6287 mh_copy = GNUNET_malloc (size);
6288 rbe = GNUNET_new (struct RingBufferEntry);
6289 rbe->cmc = cmc;
6290 /*cmc_copy->tc = cmc->tc;
6291 cmc_copy->im = cmc->im;*/
6292 GNUNET_memcpy (mh_copy, mh, size);
6293
6294 rbe->mh = mh_copy;
6295
6297 {
6298 struct RingBufferEntry *rbe_old = ring_buffer[ring_buffer_head];
6299 GNUNET_free (rbe_old->cmc);
6300 GNUNET_free (rbe_old->mh);
6301 GNUNET_free (rbe_old);
6302 }
6303 ring_buffer[ring_buffer_head] = rbe;// cmc_copy;
6304 // cmc_copy->mh = (const struct GNUNET_MessageHeader *) mh_copy;
6305 cmc->mh = (const struct GNUNET_MessageHeader *) mh_copy;
6307 "Storing message for %s and type %u (%u) in ring buffer head %u is full %u\n",
6308 GNUNET_i2s (&cmc->im.sender),
6309 (unsigned int) ntohs (mh->type),
6310 (unsigned int) ntohs (mh_copy->type),
6314 {
6315 ring_buffer_head = 0;
6317 }
6318 else
6320
6322 "%u items stored in ring buffer\n",
6325
6326 /*GNUNET_break_op (0);
6327 GNUNET_STATISTICS_update (GST_stats,
6328 "# CORE messages dropped (virtual link still down)",
6329 1,
6330 GNUNET_NO);
6331
6332 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6333 "CORE messages of type %u with %u bytes dropped (virtual link still down)\n",
6334 (unsigned int) ntohs (mh->type),
6335 (unsigned int) ntohs (mh->size));
6336 finish_cmc_handling (cmc);*/
6339 // GNUNET_free (cmc);
6340 return;
6341 }
6343}
6344
6345
6353static int
6355{
6356 uint16_t size = ntohs (fb->header.size);
6357 uint16_t bsize = size - sizeof(*fb);
6358
6359 (void) cls;
6360 if (0 == bsize)
6361 {
6362 GNUNET_break_op (0);
6363 return GNUNET_SYSERR;
6364 }
6365 if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
6366 {
6367 GNUNET_break_op (0);
6368 return GNUNET_SYSERR;
6369 }
6370 if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
6371 {
6372 GNUNET_break_op (0);
6373 return GNUNET_SYSERR;
6374 }
6375 return GNUNET_YES;
6376}
6377
6378
6384static void
6386{
6387 struct AcknowledgementCummulator *ac = cls;
6388
6389 ac->task = NULL;
6390 GNUNET_assert (0 == ac->num_acks);
6392 GNUNET_YES ==
6394 GNUNET_free (ac);
6395}
6396
6397
6403static void
6405{
6406 struct Neighbour *n;
6407 struct VirtualLink *vl;
6408 struct AcknowledgementCummulator *ac = cls;
6409 char buf[sizeof(struct TransportReliabilityAckMessage)
6410 + ac->num_acks
6412 struct TransportReliabilityAckMessage *ack =
6413 (struct TransportReliabilityAckMessage *) buf;
6415
6416 ac->task = NULL;
6418 "Sending ACK with %u components to %s\n",
6419 ac->num_acks,
6420 GNUNET_i2s (&ac->target));
6421 GNUNET_assert (0 < ac->num_acks);
6423 ack->header.size =
6424 htons (sizeof(*ack)
6425 + ac->num_acks * sizeof(struct TransportCummulativeAckPayloadP));
6426 ack->ack_counter = htonl (ac->ack_counter += ac->num_acks);
6427 ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
6428 for (unsigned int i = 0; i < ac->num_acks; i++)
6429 {
6430 ap[i].ack_uuid = ac->ack_uuids[i].ack_uuid;
6433 }
6434 /*route_control_message_without_fc (
6435 &ac->target,
6436 &ack->header,
6437 RMO_DV_ALLOWED);*/
6438 vl = lookup_virtual_link (&ac->target);
6439 if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
6440 {
6442 vl,
6443 &ack->header,
6445 }
6446 else
6447 {
6448 /* Use route via neighbour */
6449 n = lookup_neighbour (&ac->target);
6450 if (NULL != n)
6452 n,
6453 &ack->header,
6454 RMO_NONE);
6455 }
6456 ac->num_acks = 0;
6459 ac);
6460}
6461
6462
6471static void
6473 const struct AcknowledgementUUIDP *ack_uuid,
6474 struct GNUNET_TIME_Absolute max_delay)
6475{
6476 struct AcknowledgementCummulator *ac;
6477
6479 "Scheduling ACK %s for transmission to %s\n",
6480 GNUNET_uuid2s (&ack_uuid->value),
6481 GNUNET_i2s (pid));
6483 if (NULL == ac)
6484 {
6486 ac->target = *pid;
6487 ac->min_transmission_time = max_delay;
6491 &ac->target,
6492 ac,
6494 }
6495 else
6496 {
6497 if (MAX_CUMMULATIVE_ACKS == ac->num_acks)
6498 {
6499 /* must run immediately, ack buffer full! */
6501 }
6505 }
6508 ac->ack_uuids[ac->num_acks].ack_uuid = *ack_uuid;
6509 ac->num_acks++;
6512 ac);
6513}
6514
6515
6520{
6525
6530};
6531
6532
6542static int
6543find_by_message_uuid (void *cls, uint32_t key, void *value)
6544{
6545 struct FindByMessageUuidContext *fc = cls;
6546 struct ReassemblyContext *rc = value;
6547
6548 (void) key;
6549 if (0 == GNUNET_memcmp (&fc->message_uuid, &rc->msg_uuid))
6550 {
6551 fc->rc = rc;
6552 return GNUNET_NO;
6553 }
6554 return GNUNET_YES;
6555}
6556
6557
6565static void
6567{
6568 struct CommunicatorMessageContext *cmc = cls;
6569 struct VirtualLink *vl;
6570 struct ReassemblyContext *rc;
6571 const struct GNUNET_MessageHeader *msg;
6572 uint16_t msize;
6573 uint16_t fsize;
6574 uint16_t frag_off;
6575 char *target;
6576 struct GNUNET_TIME_Relative cdelay;
6577 struct FindByMessageUuidContext fc;
6578
6579 vl = lookup_virtual_link (&cmc->im.sender);
6580 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
6581 {
6582 struct GNUNET_SERVICE_Client *client = cmc->tc->client;
6583
6585 "No virtual link for %s to handle fragment\n",
6586 GNUNET_i2s (&cmc->im.sender));
6587 GNUNET_break (0);
6588 finish_cmc_handling (cmc);
6590 return;
6591 }
6592 if (NULL == vl->reassembly_map)
6593 {
6595 vl->reassembly_heap =
6600 vl);
6601 }
6602 msize = ntohs (fb->msg_size);
6603 fc.message_uuid = fb->msg_uuid;
6604 fc.rc = NULL;
6606 fb->msg_uuid.uuid,
6608 &fc);
6609 fsize = ntohs (fb->header.size) - sizeof(*fb);
6610 if (NULL == (rc = fc.rc))
6611 {
6612 rc = GNUNET_malloc (sizeof(*rc) + msize /* reassembly payload buffer */
6613 + (msize + 7) / 8 * sizeof(uint8_t) /* bitfield */);
6614 rc->msg_uuid = fb->msg_uuid;
6615 rc->virtual_link = vl;
6616 rc->msg_size = msize;
6617 rc->reassembly_timeout =
6621 rc,
6625 vl->reassembly_map,
6626 rc->msg_uuid.uuid,
6627 rc,
6629 target = (char *) &rc[1];
6630 rc->bitfield = (uint8_t *) (target + rc->msg_size);
6631 if (fsize != rc->msg_size)
6632 rc->msg_missing = rc->msg_size;
6633 else
6634 rc->msg_missing = 0;
6636 "Received fragment with size %u at offset %u/%u %u bytes missing from %s for NEW message %"
6637 PRIu64 "\n",
6638 fsize,
6639 ntohs (fb->frag_off),
6640 msize,
6641 rc->msg_missing,
6642 GNUNET_i2s (&cmc->im.sender),
6643 fb->msg_uuid.uuid);
6644 }
6645 else
6646 {
6647 target = (char *) &rc[1];
6649 "Received fragment at offset %u/%u from %s for message %u\n",
6650 ntohs (fb->frag_off),
6651 msize,
6652 GNUNET_i2s (&cmc->im.sender),
6653 (unsigned int) fb->msg_uuid.uuid);
6654 }
6655 if (msize != rc->msg_size)
6656 {
6657 GNUNET_break (0);
6658 finish_cmc_handling (cmc);
6659 return;
6660 }
6661
6662 /* reassemble */
6663 if (0 == fsize)
6664 {
6665 GNUNET_break (0);
6666 finish_cmc_handling (cmc);
6667 return;
6668 }
6669 frag_off = ntohs (fb->frag_off);
6670 if (frag_off + fsize > msize)
6671 {
6672 /* Fragment (plus fragment size) exceeds message size! */
6673 GNUNET_break_op (0);
6674 finish_cmc_handling (cmc);
6675 return;
6676 }
6677 memcpy (&target[frag_off], &fb[1], fsize);
6678 /* update bitfield and msg_missing */
6679 for (unsigned int i = frag_off; i < frag_off + fsize; i++)
6680 {
6681 if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
6682 {
6683 rc->bitfield[i / 8] |= (1 << (i % 8));
6684 rc->msg_missing--;
6685 }
6686 }
6687
6688 /* Compute cumulative ACK */
6690 cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->msg_missing / fsize);
6691 if (0 == rc->msg_missing)
6692 cdelay = GNUNET_TIME_UNIT_ZERO;
6693 cummulative_ack (&cmc->im.sender,
6694 &fb->ack_uuid,
6697 /* is reassembly complete? */
6698 if (0 != rc->msg_missing)
6699 {
6700 finish_cmc_handling (cmc);
6701 return;
6702 }
6703 /* reassembly is complete, verify result */
6704 msg = (const struct GNUNET_MessageHeader *) &rc[1];
6705 if (ntohs (msg->size) != rc->msg_size)
6706 {
6707 GNUNET_break (0);
6709 finish_cmc_handling (cmc);
6710 return;
6711 }
6712 /* successful reassembly */
6714 "Fragment reassembly complete for message %u\n",
6715 (unsigned int) fb->msg_uuid.uuid);
6716 /* FIXME: check that the resulting msg is NOT a
6717 DV Box or Reliability Box, as that is NOT allowed! */
6718 cmc->mh = msg;
6720 /* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still
6721 en-route and we forget that we finished this reassembly immediately!
6722 -> keep around until timeout?
6723 -> shorten timeout based on ACK? */
6725}
6726
6727
6735static int
6737 const struct TransportReliabilityBoxMessage *rb)
6738{
6739 const struct GNUNET_MessageHeader *box = (const struct
6740 GNUNET_MessageHeader *) &rb[1];
6741 (void) cls;
6742
6744 "check_send_msg with size %u: inner msg type %u and size %u (%lu %lu)\n",
6745 ntohs (rb->header.size),
6746 ntohs (box->type),
6747 ntohs (box->size),
6748 sizeof (struct TransportReliabilityBoxMessage),
6749 sizeof (struct GNUNET_MessageHeader));
6751 return GNUNET_YES;
6752}
6753
6754
6762static void
6764 const struct TransportReliabilityBoxMessage *rb)
6765{
6766 struct CommunicatorMessageContext *cmc = cls;
6767 const struct GNUNET_MessageHeader *inbox =
6768 (const struct GNUNET_MessageHeader *) &rb[1];
6769 struct GNUNET_TIME_Relative rtt;
6770
6772 "Received reliability box from %s with UUID %s of type %u\n",
6773 GNUNET_i2s (&cmc->im.sender),
6775 (unsigned int) ntohs (inbox->type));
6776 rtt = GNUNET_TIME_UNIT_SECONDS; /* FIXME: should base this on "RTT", but we
6777 do not really have an RTT for the
6778 * incoming* queue (should we have
6779 the sender add it to the rb message?) */
6781 &cmc->im.sender,
6782 &rb->ack_uuid,
6783 (0 == ntohl (rb->ack_countdown))
6786 GNUNET_TIME_relative_divide (rtt, 8 /* FIXME: magic constant */)));
6787 /* continue with inner message */
6788 /* FIXME: check that inbox is NOT a DV Box, fragment or another
6789 reliability box (not allowed!) */
6790 cmc->mh = inbox;
6792}
6793
6794
6803static void
6804update_pd_age (struct PerformanceData *pd, unsigned int age)
6805{
6806 unsigned int sage;
6807
6808 if (age == pd->last_age)
6809 return; /* nothing to do */
6810 sage = GNUNET_MAX (pd->last_age, age - 2 * GOODPUT_AGING_SLOTS);
6811 for (unsigned int i = sage; i <= age - GOODPUT_AGING_SLOTS; i++)
6812 {
6813 struct TransmissionHistoryEntry *the = &pd->the[i % GOODPUT_AGING_SLOTS];
6814
6815 the->bytes_sent = 0;
6816 the->bytes_received = 0;
6817 }
6818 pd->last_age = age;
6819}
6820
6821
6830static void
6832 struct GNUNET_TIME_Relative rtt,
6833 uint16_t bytes_transmitted_ok)
6834{
6835 uint64_t nval = rtt.rel_value_us;
6836 uint64_t oval = pd->aged_rtt.rel_value_us;
6837 unsigned int age = get_age ();
6838 struct TransmissionHistoryEntry *the = &pd->the[age % GOODPUT_AGING_SLOTS];
6839
6840 if (oval == GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us)
6841 pd->aged_rtt = rtt;
6842 else
6843 pd->aged_rtt.rel_value_us = (nval + 7 * oval) / 8;
6844 update_pd_age (pd, age);
6845 the->bytes_received += bytes_transmitted_ok;
6846}
6847
6848
6856static void
6858 struct GNUNET_TIME_Relative rtt,
6859 uint16_t bytes_transmitted_ok)
6860{
6861 update_performance_data (&q->pd, rtt, bytes_transmitted_ok);
6862}
6863
6864
6872static void
6874 struct GNUNET_TIME_Relative rtt,
6875 uint16_t bytes_transmitted_ok)
6876{
6877 update_performance_data (&dvh->pd, rtt, bytes_transmitted_ok);
6878}
6879
6880
6888static void
6890{
6891 struct PendingMessage *pos;
6892
6894 "Complete transmission of message %" PRIu64 " %u\n",
6895 pm->logging_uuid,
6896 pm->pmt);
6897 switch (pm->pmt)
6898 {
6899 case PMT_CORE:
6901 /* Full message sent, we are done */
6903 return;
6904
6905 case PMT_FRAGMENT_BOX:
6906 /* Fragment sent over reliable channel */
6907 pos = pm->frag_parent;
6911 "pos frag_off %lu pos bytes_msg %lu pmt %u parent %u\n",
6912 (unsigned long) pos->frag_off,
6913 (unsigned long) pos->bytes_msg,
6914 pos->pmt,
6915 NULL == pos->frag_parent ? 1 : 0);
6916 /* check if subtree is done */
6917 while ((NULL == pos->head_frag) && (pos->frag_off == (pos->bytes_msg
6918 - sizeof(struct
6920 &&
6921 (NULL != pos->frag_parent))
6922 {
6923 pm = pos;
6924 pos = pm->frag_parent;
6925 if ((NULL == pos) && (PMT_DV_BOX == pm->pmt))
6926 {
6928 return;
6929 }
6930 else if (PMT_DV_BOX == pm->pmt)
6931 {
6933 return;
6934 }
6936