GNUnet 0.22.0
gnunet-service-transport.c
Go to the documentation of this file.
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
75#include "platform.h"
76#include "gnunet_util_lib.h"
80#include "gnunet_nat_service.h"
82#include "gnunet_signatures.h"
83#include "transport.h"
84
88#define RING_BUFFER_SIZE 16
89
93#define MAX_FC_RETRANSMIT_COUNT 1000
94
99#define MAX_CUMMULATIVE_ACKS 64
100
113#define FC_NO_CHANGE_REPLY_PROBABILITY 8
114
119#define IN_PACKET_SIZE_WITHOUT_MTU 128
120
125#define GOODPUT_AGING_SLOTS 4
126
131#define DEFAULT_WINDOW_SIZE (128 * 1024)
132
141#define MAX_INCOMING_REQUEST 16
142
147#define MAX_DV_DISCOVERY_SELECTION 16
148
157#define RECV_WINDOW_SIZE 4
158
166#define MIN_DV_PATH_LENGTH_FOR_INITIATOR 3
167
171#define MAX_DV_HOPS_ALLOWED 16
172
177#define MAX_DV_LEARN_PENDING 64
178
182#define MAX_DV_PATHS_TO_TARGET 3
183
189#define DELAY_WARN_THRESHOLD \
190 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
191
196#define DV_FORWARD_TIMEOUT \
197 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
198
202#define DEFAULT_ACK_WAIT_DURATION \
203 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
204
210#define DV_QUALITY_RTT_THRESHOLD \
211 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
212
217#define DV_PATH_VALIDITY_TIMEOUT \
218 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
219
224#define BACKCHANNEL_INACTIVITY_TIMEOUT \
225 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
226
231#define DV_PATH_DISCOVERY_FREQUENCY \
232 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
233
237#define EPHEMERAL_VALIDITY \
238 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
239
243#define REASSEMBLY_EXPIRATION \
244 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
245
250#define FAST_VALIDATION_CHALLENGE_FREQ \
251 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
252
256#define MAX_VALIDATION_CHALLENGE_FREQ \
257 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
258
264#define ACK_CUMMULATOR_TIMEOUT \
265 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
266
271#define DV_LEARN_BASE_FREQUENCY GNUNET_TIME_UNIT_MINUTES
272
277#define DV_LEARN_QUALITY_THRESHOLD 100
278
282#define MAX_ADDRESS_VALID_UNTIL \
283 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MONTHS, 1)
284
288#define ADDRESS_VALIDATION_LIFETIME \
289 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
290
297#define MIN_DELAY_ADDRESS_VALIDATION GNUNET_TIME_UNIT_MILLISECONDS
298
305#define VALIDATION_RTT_BUFFER_FACTOR 3
306
313#define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
314
320#define QUEUE_LENGTH_LIMIT 32
321
325#define QUEUE_ENTRY_TIMEOUT \
326 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
327
332#define RTT_DIFF \
333 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
334
336
341{
347};
348
349
354{
359};
360
365{
370
371 /* Followed by *another* message header which is the message to
372 the communicator */
373
374 /* Followed by a 0-terminated name of the communicator */
375};
376
377
382{
387
403
408
414};
415
416
422{
427
433
445
446 /* Followed by a `struct GNUNET_MessageHeader` with a message
447 for the target peer */
448};
449
450
456{
461
469
476};
477
478
483{
491
496};
497
498
507{
512
518
519 /* followed by any number of `struct TransportCummulativeAckPayloadP`
520 messages providing ACKs */
521};
522
523
528{
533
538
543
552
558};
559
560
579{
584
598
603};
604
605
623{
628
633
638
643};
644
645
651{
656
662};
663
664
679{
684
690
700
707
721
727
732
737
738 /* Followed by @e num_hops `struct DVPathEntryP` values,
739 excluding the initiator of the DV trace; the last entry is the
740 current sender; the current peer must not be included. */
741};
742
743
767{
772
776 unsigned int without_fc;
777
785
792
798
805
812
819
820 /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
821 excluding the @e origin and the current peer, the last must be
822 the ultimate target; if @e num_hops is zero, the receiver of this
823 message is the ultimate target. */
824
825 /* Followed by encrypted, variable-size payload, which
826 must begin with a `struct TransportDVBoxPayloadP` */
827
828 /* Followed by the actual message, which itself must not be a
829 a DV_LEARN or DV_BOX message! */
830};
831
832
838{
843
848
853
859};
860
861
867{
872
878
883};
884
885
891{
896
901
907
912
917 struct GNUNET_TIME_AbsoluteNBO origin_time;
918
923 struct GNUNET_TIME_RelativeNBO validity_duration;
924};
925
927{
931 unsigned int address_length;
932
933 /* Followed by @e address_length bytes of the address. */
934};
935
945{
950
959
965
972
982
992
997
1001 unsigned int sync_ready;
1002
1007
1012
1013 /* Followed by @e number_of_addresses struct TransportGlobalNattedAddress. */
1014};
1015
1017
1018
1023{
1028
1033
1038
1043
1047 CT_APPLICATION = 4
1049
1050
1056{
1061
1066
1071
1076
1082 RMO_REDUNDANT = 4
1084
1085
1090{
1095
1100
1105
1111};
1112
1113
1119{
1123 uint64_t bytes_sent;
1124
1130};
1131
1132
1137{
1142
1148
1153 unsigned int last_age;
1154};
1155
1156
1160struct TransportClient;
1161
1165struct Neighbour;
1166
1171struct DistanceVector;
1172
1177struct Queue;
1178
1182struct PendingMessage;
1183
1187struct DistanceVectorHop;
1188
1197struct VirtualLink;
1198
1199
1205{
1211
1217
1222
1227
1232
1237 uint16_t total_hops;
1238
1242 unsigned int continue_send;
1243};
1244
1245
1250{
1255
1260};
1261
1262
1267{
1272
1277
1282
1286 uint16_t size;
1287
1294 uint16_t isize;
1295};
1296
1297
1302{
1308
1313
1318
1326 uint8_t *bitfield;
1327
1332
1338
1342 uint16_t msg_size;
1343
1348 uint16_t msg_missing;
1349
1350 /* Followed by @e msg_size bytes of the (partially) defragmented original
1351 * message */
1352
1353 /* Followed by @e bitfield data */
1354};
1355
1356
1366{
1371
1378
1385
1390
1396
1402
1407
1412
1417
1422
1430
1436
1441
1446
1451
1456 unsigned int confirmed;
1457
1461 struct Neighbour *n;
1462
1467
1474
1481
1490
1496
1502
1507
1513
1522
1530
1537
1546
1559
1565
1572
1583
1588 uint32_t fc_seq_gen;
1589
1595 uint32_t last_fc_seq;
1596
1609
1614};
1615
1616
1621{
1627
1633
1640
1647
1654
1661
1668
1675
1680
1686
1692
1697 struct Queue *queue;
1698
1703
1708
1712 unsigned int num_send;
1713};
1714
1715
1720{
1725
1730
1735
1740
1745
1750
1755
1760
1767
1773
1782
1787
1793 unsigned int distance;
1794};
1795
1796
1802{
1807
1812
1817
1822
1828
1834
1839
1844
1849
1854};
1855
1856
1867{
1872
1877
1881 struct Queue *queue;
1882
1887
1891 uint64_t mid;
1892
1897};
1898
1899
1904struct Queue
1905{
1910
1915
1920
1925
1930
1935
1940
1945
1950
1955
1959 const char *address;
1960
1964 unsigned int unlimited_length;
1965
1971
1980
1985
1991
1996 uint64_t mid_gen;
1997
2001 uint32_t qid;
2002
2006 uint32_t mtu;
2007
2012
2017
2021 unsigned int queue_length;
2022
2026 uint64_t q_capacity;
2027
2031 uint32_t priority;
2032
2037
2042
2047 int idle;
2048
2053};
2054
2055
2060{
2065
2071
2077
2082
2087
2093
2099
2105
2111
2117
2122
2127
2132
2137};
2138
2139
2145{
2150
2155
2160
2165};
2166
2167
2171struct PeerRequest
2172{
2177
2182
2187
2194
2199};
2200
2201
2206{
2211
2216
2221
2225 PMT_DV_BOX = 3
2227
2228
2255struct PendingMessage
2256{
2261
2266
2271
2276
2282
2288
2293
2298
2304
2310
2315
2325
2330
2335
2340
2345
2350
2355
2361
2367
2372
2378
2383
2387 uint16_t bytes_msg;
2388
2392 uint16_t frag_off;
2393
2398
2403
2407 uint16_t frag_count;
2408
2413
2414 /* Followed by @e bytes_msg to transmit */
2415};
2416
2417
2422{
2428
2433};
2434
2435
2441{
2446
2451
2458
2463
2469 uint32_t ack_counter;
2470
2474 unsigned int num_acks;
2475};
2476
2477
2482{
2487
2492
2497
2502
2506 const char *address;
2507
2512
2517
2522
2527
2533
2537 uint32_t aid;
2538
2543};
2544
2545
2550{
2555
2560
2565
2570
2575
2576 union
2577 {
2581 struct
2582 {
2588
2594
2598 struct
2599 {
2606
2612
2613
2617 struct
2618 {
2624
2629
2634
2640
2646
2653
2658
2663
2668
2670
2674 struct
2675 {
2683};
2684
2685
2691{
2697
2705
2711
2718 struct GNUNET_TIME_Absolute first_challenge_use;
2719
2726 struct GNUNET_TIME_Absolute last_challenge_use;
2727
2735 struct GNUNET_TIME_Absolute next_challenge;
2736
2745 struct GNUNET_TIME_Relative challenge_backoff;
2746
2751 struct GNUNET_TIME_Relative validation_rtt;
2752
2760 struct GNUNET_CRYPTO_ChallengeNonceP challenge;
2761
2765 struct GNUNET_HashCode hc;
2766
2770 struct GNUNET_SCHEDULER_Task *revalidation_task;
2771
2775 char *address;
2776
2782 struct GNUNET_CONTAINER_HeapNode *hn;
2783
2789
2795 uint32_t last_window_consum_limit;
2796
2801 int awaiting_queue;
2802};
2803
2804
2812{
2817
2822
2827
2832
2838
2843
2849
2855
2861};
2862
2867
2871static unsigned int ring_buffer_head;
2872
2876static unsigned int is_ring_buffer_full;
2877
2882
2886static unsigned int ring_buffer_dv_head;
2887
2891static unsigned int is_ring_buffer_dv_full;
2892
2897
2902
2907
2912
2917
2922
2927
2933
2939
2945
2951
2957
2963
2969
2975
2980
2984static struct LearnLaunchEntry *lle_head = NULL;
2985
2989static struct LearnLaunchEntry *lle_tail = NULL;
2990
2997
3002
3007
3012
3017
3024
3029
3033static unsigned int ir_total;
3034
3038static unsigned long long logging_uuid_gen;
3039
3044
3054
3059static int in_shutdown;
3060
3065
3067
3069
3080static unsigned int
3082{
3083 struct GNUNET_TIME_Absolute now;
3084
3085 now = GNUNET_TIME_absolute_get ();
3086 return now.abs_value_us / GNUNET_TIME_UNIT_MINUTES.rel_value_us / 15;
3087}
3088
3089
3095static void
3097{
3099 GNUNET_assert (ir_total > 0);
3100 ir_total--;
3101 if (NULL != ir->nc)
3103 ir->nc = NULL;
3104 GNUNET_free (ir);
3105}
3106
3107
3113static void
3115{
3116 struct Queue *q = pa->queue;
3117 struct PendingMessage *pm = pa->pm;
3118 struct DistanceVectorHop *dvh = pa->dvh;
3119
3121 "free_pending_acknowledgement\n");
3122 if (NULL != q)
3123 {
3124 GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
3125 pa->queue = NULL;
3126 }
3127 if (NULL != pm)
3128 {
3130 "remove pa from message\n");
3132 "remove pa from message %" PRIu64 "\n",
3133 pm->logging_uuid);
3135 "remove pa from message %u\n",
3136 pm->pmt);
3138 "remove pa from message %s\n",
3140 GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
3141 pa->pm = NULL;
3142 }
3143 if (NULL != dvh)
3144 {
3145 GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
3146 pa->queue = NULL;
3147 }
3150 &pa->ack_uuid.value,
3151 pa));
3152 GNUNET_free (pa);
3153}
3154
3155
3164static void
3166{
3167 struct PendingMessage *frag;
3168
3169 while (NULL != (frag = root->head_frag))
3170 {
3171 struct PendingAcknowledgement *pa;
3172
3173 free_fragment_tree (frag);
3174 while (NULL != (pa = frag->pa_head))
3175 {
3176 GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
3177 pa->pm = NULL;
3178 }
3179 GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
3180 if (NULL != frag->qe)
3181 {
3182 GNUNET_assert (frag == frag->qe->pm);
3183 frag->qe->pm = NULL;
3184 }
3186 "Free frag %p\n",
3187 frag);
3188 GNUNET_free (frag);
3189 }
3190}
3191
3192
3200static void
3202{
3203 struct TransportClient *tc = pm->client;
3204 struct VirtualLink *vl = pm->vl;
3205 struct PendingAcknowledgement *pa;
3206
3208 "Freeing pm %p\n",
3209 pm);
3210 if (NULL != tc)
3211 {
3213 tc->details.core.pending_msg_head,
3214 tc->details.core.pending_msg_tail,
3215 pm);
3216 }
3217 if ((NULL != vl) && (NULL == pm->frag_parent))
3218 {
3220 "Removing pm %" PRIu64 "\n",
3221 pm->logging_uuid);
3223 vl->pending_msg_head,
3224 vl->pending_msg_tail,
3225 pm);
3226 }
3227 else if (NULL != pm->frag_parent && PMT_DV_BOX != pm->pmt)
3228 {
3229 struct PendingMessage *root = pm->frag_parent;
3230
3231 while (NULL != root->frag_parent && PMT_DV_BOX != root->pmt)
3232 root = root->frag_parent;
3233
3234 root->frag_count--;
3235 }
3236 while (NULL != (pa = pm->pa_head))
3237 {
3238 if (NULL == pa)
3240 "free pending pa null\n");
3241 if (NULL == pm->pa_tail)
3243 "free pending pa_tail null\n");
3244 if (NULL == pa->prev_pa)
3246 "free pending pa prev null\n");
3247 if (NULL == pa->next_pa)
3249 "free pending pa next null\n");
3250 GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
3251 pa->pm = NULL;
3252 }
3253
3255 if (NULL != pm->qe)
3256 {
3257 GNUNET_assert (pm == pm->qe->pm);
3258 pm->qe->pm = NULL;
3259 }
3260 if (NULL != pm->bpm)
3261 {
3262 free_fragment_tree (pm->bpm);
3263 if (NULL != pm->bpm->qe)
3264 {
3265 struct QueueEntry *qe = pm->bpm->qe;
3266
3267 qe->pm = NULL;
3268 }
3269 GNUNET_free (pm->bpm);
3270 }
3271
3272 GNUNET_free (pm);
3274 "Freeing pm done\n");
3275}
3276
3277
3283static void
3285{
3286 struct VirtualLink *vl = rc->virtual_link;
3287
3291 rc->msg_uuid.uuid,
3292 rc));
3293 GNUNET_free (rc);
3294}
3295
3296
3302static void
3304{
3305 struct VirtualLink *vl = cls;
3306 struct ReassemblyContext *rc;
3307
3308 vl->reassembly_timeout_task = NULL;
3309 while (NULL != (rc = GNUNET_CONTAINER_heap_peek (vl->reassembly_heap)))
3310 {
3312 .rel_value_us)
3313 {
3315 continue;
3316 }
3321 vl);
3322 return;
3323 }
3324}
3325
3326
3335static int
3336free_reassembly_cb (void *cls, uint32_t key, void *value)
3337{
3338 struct ReassemblyContext *rc = value;
3339
3340 (void) cls;
3341 (void) key;
3343 return GNUNET_OK;
3344}
3345
3346
3352static void
3354{
3355 struct PendingMessage *pm;
3356 struct CoreSentContext *csc;
3357
3359 "free virtual link %p\n",
3360 vl);
3361
3362 if (NULL != vl->reassembly_map)
3363 {
3366 NULL);
3368 vl->reassembly_map = NULL;
3370 vl->reassembly_heap = NULL;
3371 }
3372 if (NULL != vl->reassembly_timeout_task)
3373 {
3376 }
3377 while (NULL != (pm = vl->pending_msg_head))
3381 if (NULL != vl->visibility_task)
3382 {
3384 vl->visibility_task = NULL;
3385 }
3386 if (NULL != vl->fc_retransmit_task)
3387 {
3389 vl->fc_retransmit_task = NULL;
3390 }
3391 while (NULL != (csc = vl->csc_head))
3392 {
3394 GNUNET_assert (vl == csc->vl);
3395 csc->vl = NULL;
3396 }
3397 GNUNET_break (NULL == vl->n);
3398 GNUNET_break (NULL == vl->dv);
3399 GNUNET_free (vl);
3400}
3401
3402
3408static void
3410{
3411 if (NULL != vs->revalidation_task)
3412 {
3413 GNUNET_SCHEDULER_cancel (vs->revalidation_task);
3414 vs->revalidation_task = NULL;
3415 }
3416 /*memcpy (&hkey,
3417 &hc,
3418 sizeof (hkey));*/
3420 "Remove key %s for address %s map size %u contains %u during freeing state\n",
3421 GNUNET_h2s (&vs->hc),
3422 vs->address,
3425 &vs->hc));
3428 GNUNET_YES ==
3431 vs->hn = NULL;
3432 if (NULL != vs->sc)
3433 {
3435 "store cancel\n");
3437 vs->sc = NULL;
3438 }
3439 GNUNET_free (vs->address);
3440 GNUNET_free (vs);
3441}
3442
3443
3450static struct Neighbour *
3452{
3454}
3455
3456
3463static struct VirtualLink *
3465{
3467}
3468
3469
3474{
3481
3486
3491
3496
3501};
3502
3503
3512static void
3514{
3515 struct Neighbour *n = dvh->next_hop;
3516 struct DistanceVector *dv = dvh->dv;
3517 struct PendingAcknowledgement *pa;
3518
3519 while (NULL != (pa = dvh->pa_head))
3520 {
3522 pa->dvh = NULL;
3523 }
3524 GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh);
3526 GNUNET_free (dvh);
3527}
3528
3529
3536static void
3537check_link_down (void *cls);
3538
3539
3545static void
3547{
3549 "Informing CORE clients about disconnect from %s\n",
3550 GNUNET_i2s (pid));
3551 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3552 {
3553 struct GNUNET_MQ_Envelope *env;
3554 struct DisconnectInfoMessage *dim;
3555
3556 if (CT_CORE != tc->type)
3557 continue;
3559 dim->peer = *pid;
3560 GNUNET_MQ_send (tc->mq, env);
3561 }
3562}
3563
3564
3571static void
3573{
3574 struct DistanceVectorHop *dvh;
3575
3576 while (NULL != (dvh = dv->dv_head))
3578 if (NULL == dv->dv_head)
3579 {
3580 struct VirtualLink *vl;
3581
3583 GNUNET_YES ==
3585 if (NULL != (vl = dv->vl))
3586 {
3587 GNUNET_assert (dv == vl->dv);
3588 vl->dv = NULL;
3589 if (NULL == vl->n)
3590 {
3592 free_virtual_link (vl);
3593 }
3594 else
3595 {
3598 }
3599 dv->vl = NULL;
3600 }
3601
3602 if (NULL != dv->timeout_task)
3603 {
3605 dv->timeout_task = NULL;
3606 }
3607 GNUNET_free (dv->km);
3608 GNUNET_free (dv);
3609 }
3610}
3611
3612
3626static void
3628 const struct GNUNET_PeerIdentity *peer,
3629 const char *address,
3631 const struct MonitorEvent *me)
3632{
3633 struct GNUNET_MQ_Envelope *env;
3635 size_t addr_len = strlen (address) + 1;
3636
3638 addr_len,
3640 md->nt = htonl ((uint32_t) nt);
3641 md->peer = *peer;
3642 md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
3643 md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
3644 md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
3645 md->rtt = GNUNET_TIME_relative_hton (me->rtt);
3646 md->cs = htonl ((uint32_t) me->cs);
3647 md->num_msg_pending = htonl (me->num_msg_pending);
3648 md->num_bytes_pending = htonl (me->num_bytes_pending);
3649 memcpy (&md[1], address, addr_len);
3650 GNUNET_MQ_send (tc->mq, env);
3651}
3652
3653
3663static void
3665 const char *address,
3667 const struct MonitorEvent *me)
3668{
3669 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3670 {
3671 if (CT_MONITOR != tc->type)
3672 continue;
3673 if (tc->details.monitor.one_shot)
3674 continue;
3675 if ((GNUNET_NO == GNUNET_is_zero (&tc->details.monitor.peer)) &&
3676 (0 != GNUNET_memcmp (&tc->details.monitor.peer, peer)))
3677 continue;
3679 }
3680}
3681
3682
3692static void *
3694 struct GNUNET_SERVICE_Client *client,
3695 struct GNUNET_MQ_Handle *mq)
3696{
3697 struct TransportClient *tc;
3698
3699 (void) cls;
3700 tc = GNUNET_new (struct TransportClient);
3701 tc->client = client;
3702 tc->mq = mq;
3705 "Client %p of type %u connected\n",
3706 tc,
3707 tc->type);
3708 return tc;
3709}
3710
3711
3712static enum GNUNET_GenericReturnValue
3714 const struct GNUNET_PeerIdentity *pid,
3715 void *value)
3716{
3717 (void) cls;
3718 struct TransportGlobalNattedAddress *tgna = value;
3719
3720 GNUNET_free (tgna);
3721
3722 return GNUNET_OK;
3723}
3724
3725
3731static void
3732free_neighbour (struct Neighbour *neighbour)
3733{
3734 struct DistanceVectorHop *dvh;
3735 struct VirtualLink *vl;
3736
3737 GNUNET_assert (NULL == neighbour->queue_head);
3740 &neighbour->pid,
3741 neighbour));
3743 "Freeing neighbour\n");
3746 NULL);
3748 while (NULL != (dvh = neighbour->dv_head))
3749 {
3750 struct DistanceVector *dv = dvh->dv;
3751
3753 if (NULL == dv->dv_head)
3754 free_dv_route (dv);
3755 }
3756 if (NULL != neighbour->get)
3757 {
3759 neighbour->get = NULL;
3760 }
3761 if (NULL != neighbour->sc)
3762 {
3764 "store cancel\n");
3765 GNUNET_PEERSTORE_store_cancel (neighbour->sc);
3766 neighbour->sc = NULL;
3767 }
3768 if (NULL != (vl = neighbour->vl))
3769 {
3770 GNUNET_assert (neighbour == vl->n);
3771 vl->n = NULL;
3772 if (NULL == vl->dv)
3773 {
3776 }
3777 else
3778 {
3781 }
3782 neighbour->vl = NULL;
3783 }
3784 GNUNET_free (neighbour);
3785}
3786
3787
3794static void
3796 const struct GNUNET_PeerIdentity *pid)
3797{
3798 struct GNUNET_MQ_Envelope *env;
3799 struct ConnectInfoMessage *cim;
3800
3801 GNUNET_assert (CT_CORE == tc->type);
3803 cim->id = *pid;
3804 GNUNET_MQ_send (tc->mq, env);
3805}
3806
3807
3813static void
3815{
3817 "Informing CORE clients about connection to %s\n",
3818 GNUNET_i2s (pid));
3819 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3820 {
3821 if (CT_CORE != tc->type)
3822 continue;
3824 }
3825}
3826
3827
3835static void
3836transmit_on_queue (void *cls);
3837
3838
3842static unsigned int
3844{
3845 for (struct Queue *s = queue_head; NULL != s;
3846 s = s->next_client)
3847 {
3848 if (s->tc->details.communicator.address_prefix !=
3849 queue->tc->details.communicator.address_prefix)
3850 {
3852 "queue address %s qid %u compare with queue: address %s qid %u\n",
3853 queue->address,
3854 queue->qid,
3855 s->address,
3856 s->qid);
3857 if ((s->priority > queue->priority) && (0 < s->q_capacity) &&
3858 (QUEUE_LENGTH_LIMIT > s->queue_length) )
3859 return GNUNET_YES;
3861 "Lower prio\n");
3862 }
3863 }
3864 return GNUNET_NO;
3865}
3866
3867
3875static void
3877 struct Queue *queue,
3879{
3881
3882 if (queue->validated_until.abs_value_us < now.abs_value_us)
3883 return;
3885 queue->tc->details.communicator.
3886 queue_head))
3887 return;
3888
3889 if (queue->tc->details.communicator.total_queue_length >=
3891 {
3893 "Transmission on queue %s (QID %u) throttled due to communicator queue limit\n",
3894 queue->address,
3895 queue->qid);
3897 GST_stats,
3898 "# Transmission throttled due to communicator queue limit",
3899 1,
3900 GNUNET_NO);
3901 queue->idle = GNUNET_NO;
3902 return;
3903 }
3904 if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
3905 {
3907 "Transmission on queue %s (QID %u) throttled due to communicator queue length limit\n",
3908 queue->address,
3909 queue->qid);
3911 "# Transmission throttled due to queue queue limit",
3912 1,
3913 GNUNET_NO);
3914 queue->idle = GNUNET_NO;
3915 return;
3916 }
3917 if (0 == queue->q_capacity)
3918 {
3920 "Transmission on queue %s (QID %u) throttled due to communicator message has capacity %"
3921 PRIu64 ".\n",
3922 queue->address,
3923 queue->qid,
3924 queue->q_capacity);
3926 "# Transmission throttled due to message queue capacity",
3927 1,
3928 GNUNET_NO);
3929 queue->idle = GNUNET_NO;
3930 return;
3931 }
3932 /* queue might indeed be ready, schedule it */
3933 if (NULL != queue->transmit_task)
3934 GNUNET_SCHEDULER_cancel (queue->transmit_task);
3935 queue->transmit_task =
3937 queue);
3939 "Considering transmission on queue `%s' QID %llu to %s\n",
3940 queue->address,
3941 (unsigned long long) queue->qid,
3942 GNUNET_i2s (&queue->neighbour->pid));
3943}
3944
3945
3952static void
3954{
3955 struct VirtualLink *vl = cls;
3956 struct DistanceVector *dv = vl->dv;
3957 struct Neighbour *n = vl->n;
3958 struct GNUNET_TIME_Absolute dvh_timeout;
3959 struct GNUNET_TIME_Absolute q_timeout;
3960
3962 "Checking if link is down\n");
3963 vl->visibility_task = NULL;
3964 dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3965 if (NULL != dv)
3966 {
3967 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
3968 pos = pos->next_dv)
3969 dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout,
3970 pos->path_valid_until);
3971 if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
3972 {
3973 vl->dv->vl = NULL;
3974 vl->dv = NULL;
3975 }
3976 }
3977 q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3978 for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
3979 q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
3980 if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
3981 {
3982 vl->n->vl = NULL;
3983 vl->n = NULL;
3984 }
3985 if ((NULL == vl->n) && (NULL == vl->dv))
3986 {
3988 free_virtual_link (vl);
3989 return;
3990 }
3991 vl->visibility_task =
3992 GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout),
3994 vl);
3995}
3996
3997
4003static void
4005{
4006 struct Neighbour *neighbour = queue->neighbour;
4007 struct TransportClient *tc = queue->tc;
4008 struct MonitorEvent me = { .cs = GNUNET_TRANSPORT_CS_DOWN,
4010 struct QueueEntry *qe;
4011 int maxxed;
4012 struct PendingAcknowledgement *pa;
4013 struct VirtualLink *vl;
4014
4016 "Cleaning up queue %u\n", queue->qid);
4017 if (NULL != queue->mo)
4018 {
4020 queue->mo = NULL;
4021 }
4022 if (NULL != queue->transmit_task)
4023 {
4024 GNUNET_SCHEDULER_cancel (queue->transmit_task);
4025 queue->transmit_task = NULL;
4026 }
4027 while (NULL != (pa = queue->pa_head))
4028 {
4029 GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
4030 pa->queue = NULL;
4031 }
4032
4034 neighbour->queue_head,
4035 neighbour->queue_tail,
4036 queue);
4038 tc->details.communicator.queue_head,
4039 tc->details.communicator.queue_tail,
4040 queue);
4042 tc->details.communicator.total_queue_length);
4044 "Cleaning up queue with length %u\n",
4045 queue->queue_length);
4046 while (NULL != (qe = queue->queue_head))
4047 {
4048 GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe);
4049 queue->queue_length--;
4050 tc->details.communicator.total_queue_length--;
4051 if (NULL != qe->pm)
4052 {
4053 GNUNET_assert (qe == qe->pm->qe);
4054 qe->pm->qe = NULL;
4055 }
4056 GNUNET_free (qe);
4057 }
4058 GNUNET_assert (0 == queue->queue_length);
4059 if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT >
4060 tc->details.communicator.total_queue_length))
4061 {
4062 /* Communicator dropped below threshold, resume all _other_ queues */
4064 GST_stats,
4065 "# Transmission throttled due to communicator queue limit",
4066 -1,
4067 GNUNET_NO);
4068 for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
4069 s = s->next_client)
4071 s,
4073 }
4074 notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
4076
4077 vl = lookup_virtual_link (&neighbour->pid);
4078 if ((NULL != vl) && (neighbour == vl->n))
4079 {
4081 check_link_down (vl);
4082 }
4083 if (NULL == neighbour->queue_head)
4084 {
4085 free_neighbour (neighbour);
4086 }
4087}
4088
4089
4095static void
4097{
4098 struct TransportClient *tc = ale->tc;
4099
4100 GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
4101 tc->details.communicator.addr_tail,
4102 ale);
4103 if (NULL != ale->sc)
4104 {
4106 "store cancel\n");
4108 ale->sc = NULL;
4109 }
4110 if (NULL != ale->st)
4111 {
4113 ale->st = NULL;
4114 }
4115 if (NULL != ale->signed_address)
4117 GNUNET_free (ale);
4118}
4119
4120
4129static int
4131 const struct GNUNET_PeerIdentity *pid,
4132 void *value)
4133{
4134 struct TransportClient *tc = cls;
4135 struct PeerRequest *pr = value;
4136
4137 if (NULL != pr->nc)
4139 pr->nc = NULL;
4141 GNUNET_YES ==
4142 GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
4143 pid,
4144 pr));
4145 GNUNET_free (pr);
4146
4147 return GNUNET_OK;
4148}
4149
4150
4151static void
4152do_shutdown (void *cls);
4153
4162static void
4164 struct GNUNET_SERVICE_Client *client,
4165 void *app_ctx)
4166{
4167 struct TransportClient *tc = app_ctx;
4168
4169 (void) cls;
4170 (void) client;
4172 switch (tc->type)
4173 {
4174 case CT_NONE:
4176 "Unknown Client %p disconnected, cleaning up.\n",
4177 tc);
4178 break;
4179
4180 case CT_CORE: {
4182 "CORE Client %p disconnected, cleaning up.\n",
4183 tc);
4184
4185 struct PendingMessage *pm;
4186
4187 while (NULL != (pm = tc->details.core.pending_msg_head))
4188 {
4190 tc->details.core.pending_msg_head,
4191 tc->details.core.pending_msg_tail,
4192 pm);
4193 pm->client = NULL;
4194 }
4195 }
4196 break;
4197
4198 case CT_MONITOR:
4200 "MONITOR Client %p disconnected, cleaning up.\n",
4201 tc);
4202
4203 break;
4204
4205 case CT_COMMUNICATOR: {
4207 "COMMUNICATOR Client %p disconnected, cleaning up.\n",
4208 tc);
4209
4210 struct Queue *q;
4211 struct AddressListEntry *ale;
4212
4213 if (NULL != tc->details.communicator.free_queue_entry_task)
4215 tc->details.communicator.free_queue_entry_task);
4216 while (NULL != (q = tc->details.communicator.queue_head))
4217 free_queue (q);
4218 while (NULL != (ale = tc->details.communicator.addr_head))
4220 GNUNET_free (tc->details.communicator.address_prefix);
4221 }
4222 break;
4223
4224 case CT_APPLICATION:
4226 "APPLICATION Client %p disconnected, cleaning up.\n",
4227 tc);
4228
4229 GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
4231 tc);
4232 GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests);
4233 break;
4234 }
4235 GNUNET_free (tc);
4236 if ((GNUNET_YES == in_shutdown) && (NULL == clients_head))
4237 {
4239 "Our last client disconnected\n");
4240 do_shutdown (cls);
4241 }
4242}
4243
4244
4254static int
4256 const struct GNUNET_PeerIdentity *pid,
4257 void *value)
4258{
4259 struct TransportClient *tc = cls;
4260 struct VirtualLink *vl = value;
4261
4262 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
4263 return GNUNET_OK;
4264
4266 "Telling new CORE client about existing connection to %s\n",
4267 GNUNET_i2s (pid));
4269 return GNUNET_OK;
4270}
4271
4272
4278static void
4280 unsigned
4281 int free_cmc);
4282
4283static enum GNUNET_GenericReturnValue
4285 const struct GNUNET_PeerIdentity *pid,
4286 void *value)
4287{
4288 struct VirtualLink *vl = value;
4289 struct CommunicatorMessageContext *cmc;
4290
4291 /* resume communicators */
4292 while (NULL != (cmc = vl->cmc_tail))
4293 {
4295 if (GNUNET_NO == cmc->continue_send)
4297 }
4298 return GNUNET_OK;
4299}
4300
4301
4310static void
4311handle_client_start (void *cls, const struct StartMessage *start)
4312{
4313 struct TransportClient *tc = cls;
4314 uint32_t options;
4315
4316 options = ntohl (start->options);
4317 if ((0 != (1 & options)) &&
4318 (0 != GNUNET_memcmp (&start->self, &GST_my_identity)))
4319 {
4320 /* client thinks this is a different peer, reject */
4321 GNUNET_break (0);
4323 return;
4324 }
4325 if (CT_NONE != tc->type)
4326 {
4327 GNUNET_break (0);
4329 return;
4330 }
4331 tc->type = CT_CORE;
4333 "New CORE client with PID %s registered\n",
4334 GNUNET_i2s (&start->self));
4337 tc);
4340 NULL);
4342}
4343
4344
4351static int
4352check_client_send (void *cls, const struct OutboundMessage *obm)
4353{
4354 struct TransportClient *tc = cls;
4355 uint16_t size;
4356 const struct GNUNET_MessageHeader *obmm;
4357
4358 if (CT_CORE != tc->type)
4359 {
4360 GNUNET_break (0);
4361 return GNUNET_SYSERR;
4362 }
4363 size = ntohs (obm->header.size) - sizeof(struct OutboundMessage);
4364 if (size < sizeof(struct GNUNET_MessageHeader))
4365 {
4366 GNUNET_break (0);
4367 return GNUNET_SYSERR;
4368 }
4369 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
4370 if (size != ntohs (obmm->size))
4371 {
4372 GNUNET_break (0);
4373 return GNUNET_SYSERR;
4374 }
4375 return GNUNET_OK;
4376}
4377
4378
4386static void
4388{
4389 struct TransportClient *tc = pm->client;
4390 struct VirtualLink *vl = pm->vl;
4391
4393 "client send response\n");
4394 if (NULL != tc)
4395 {
4396 struct GNUNET_MQ_Envelope *env;
4397 struct SendOkMessage *so_msg;
4398
4400 so_msg->peer = vl->target;
4402 "Confirming transmission of <%" PRIu64 "> to %s\n",
4403 pm->logging_uuid,
4404 GNUNET_i2s (&vl->target));
4405 GNUNET_MQ_send (tc->mq, env);
4406 }
4408}
4409
4410
4420static unsigned int
4423 struct DistanceVectorHop **hops_array,
4424 unsigned int hops_array_length)
4425{
4426 uint64_t choices[hops_array_length];
4427 uint64_t num_dv;
4428 unsigned int dv_count;
4429
4430 /* Pick random vectors, but weighted by distance, giving more weight
4431 to shorter vectors */
4432 num_dv = 0;
4433 dv_count = 0;
4434 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4435 pos = pos->next_dv)
4436 {
4437 if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
4438 (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
4439 .rel_value_us == 0))
4440 continue; /* pos unconfirmed and confirmed required */
4441 num_dv += MAX_DV_HOPS_ALLOWED - pos->distance;
4442 dv_count++;
4443 }
4444 if (0 == dv_count)
4445 return 0;
4446 if (dv_count <= hops_array_length)
4447 {
4448 dv_count = 0;
4449 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4450 pos = pos->next_dv)
4451 hops_array[dv_count++] = pos;
4452 return dv_count;
4453 }
4454 for (unsigned int i = 0; i < hops_array_length; i++)
4455 {
4456 int ok = GNUNET_NO;
4457 while (GNUNET_NO == ok)
4458 {
4459 choices[i] =
4461 ok = GNUNET_YES;
4462 for (unsigned int j = 0; j < i; j++)
4463 if (choices[i] == choices[j])
4464 {
4465 ok = GNUNET_NO;
4466 break;
4467 }
4468 }
4469 }
4470 dv_count = 0;
4471 num_dv = 0;
4472 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4473 pos = pos->next_dv)
4474 {
4475 uint32_t delta = MAX_DV_HOPS_ALLOWED - pos->distance;
4476
4477 if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
4478 (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
4479 .rel_value_us == 0))
4480 continue; /* pos unconfirmed and confirmed required */
4481 for (unsigned int i = 0; i < hops_array_length; i++)
4482 if ((num_dv <= choices[i]) && (num_dv + delta > choices[i]))
4483 hops_array[dv_count++] = pos;
4484 num_dv += delta;
4485 }
4486 return dv_count;
4487}
4488
4489
4496static int
4498 void *cls,
4500{
4501 struct TransportClient *tc = cls;
4502 uint16_t size;
4503
4504 if (CT_NONE != tc->type)
4505 {
4506 GNUNET_break (0);
4507 return GNUNET_SYSERR;
4508 }
4509 tc->type = CT_COMMUNICATOR;
4510 size = ntohs (cam->header.size) - sizeof(*cam);
4511 if (0 == size)
4512 return GNUNET_OK; /* receive-only communicator */
4514 return GNUNET_OK;
4515}
4516
4517
4523static void
4525 unsigned
4526 int free_cmc)
4527{
4528 if (0 != ntohl (cmc->im.fc_on))
4529 {
4530 /* send ACK when done to communicator for flow control! */
4531 struct GNUNET_MQ_Envelope *env;
4533
4535 "Acknowledge message with flow control id %" PRIu64 "\n",
4536 cmc->im.fc_id);
4538 ack->reserved = htonl (0);
4539 ack->fc_id = cmc->im.fc_id;
4540 ack->sender = cmc->im.neighbour_sender;
4541 GNUNET_MQ_send (cmc->tc->mq, env);
4542 }
4543
4545
4546 if (GNUNET_YES == free_cmc)
4547 {
4548 GNUNET_free (cmc);
4549 }
4550}
4551
4552
4553static void
4555{
4557}
4558
4559
4569static void
4570handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
4571{
4572 struct TransportClient *tc = cls;
4573 struct VirtualLink *vl;
4574 uint32_t delta;
4575 struct CommunicatorMessageContext *cmc;
4576
4577 if (CT_CORE != tc->type)
4578 {
4579 GNUNET_break (0);
4581 return;
4582 }
4583 vl = lookup_virtual_link (&rom->peer);
4584 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
4585 {
4587 "# RECV_OK dropped: virtual link unknown",
4588 1,
4589 GNUNET_NO);
4591 return;
4592 }
4593 delta = ntohl (rom->increase_window_delta);
4594 vl->core_recv_window += delta;
4596 "CORE ack receiving message, increased CORE recv window to %d\n",
4597 vl->core_recv_window);
4599 if (vl->core_recv_window <= 0)
4600 return;
4601 /* resume communicators */
4602 while (NULL != (cmc = vl->cmc_tail))
4603 {
4605 if (GNUNET_NO == cmc->continue_send)
4607 }
4608}
4609
4610
4617static void
4619 void *cls,
4621{
4622 struct TransportClient *tc = cls;
4623 uint16_t size;
4624
4625 size = ntohs (cam->header.size) - sizeof(*cam);
4626 if (0 == size)
4627 {
4629 "Receive-only communicator connected\n");
4630 return; /* receive-only communicator */
4631 }
4632 tc->details.communicator.address_prefix =
4633 GNUNET_strdup ((const char *) &cam[1]);
4634 tc->details.communicator.cc =
4636 tc->details.communicator.can_burst
4637 = (enum GNUNET_GenericReturnValue) ntohl (cam->can_burst);
4639 "Communicator for peer %s with prefix '%s' connected %s\n",
4641 tc->details.communicator.address_prefix,
4642 tc->details.communicator.can_burst ? "can burst" :
4643 "can not burst");
4645}
4646
4647
4655static int
4657 void *cls,
4659{
4660 const struct GNUNET_MessageHeader *inbox;
4661 const char *is;
4662 uint16_t msize;
4663 uint16_t isize;
4664
4665 (void) cls;
4666 msize = ntohs (cb->header.size) - sizeof(*cb);
4667 inbox = (const struct GNUNET_MessageHeader *) &cb[1];
4668 isize = ntohs (inbox->size);
4669 if (isize >= msize)
4670 {
4671 GNUNET_break (0);
4672 return GNUNET_SYSERR;
4673 }
4674 is = (const char *) inbox;
4675 is += isize;
4676 msize -= isize;
4677 GNUNET_assert (0 < msize);
4678 if ('\0' != is[msize - 1])
4679 {
4680 GNUNET_break (0);
4681 return GNUNET_SYSERR;
4682 }
4683 return GNUNET_OK;
4684}
4685
4686
4692static void
4694{
4695 struct EphemeralConfirmationPS ec;
4696
4698 dv->ephemeral_validity =
4701 ec.target = dv->target;
4704 ec.purpose.size = htonl (sizeof(ec));
4706 &ec,
4707 &dv->sender_sig);
4708}
4709
4710
4711static void
4713 struct TransportClient *tc);
4714
4715
4716static void
4718{
4719 struct TransportClient *tc = cls;
4721
4723 "freeing timedout queue entries\n");
4724
4725 tc->details.communicator.free_queue_entry_task = NULL;
4726 for (struct Queue *queue = tc->details.communicator.queue_head; NULL != queue;
4727 queue = queue->next_client)
4728 {
4729 struct QueueEntry *qep = queue->queue_head;
4730
4732 "checking QID %u for timedout queue entries\n",
4733 queue->qid);
4734 while (NULL != qep)
4735 {
4736 struct QueueEntry *pos = qep;
4737
4738 qep = qep->next;
4740 pos->creation_timestamp, now);
4741
4743 "diff to now %s \n",
4746 {
4748 "Freeing timed out QueueEntry with MID %" PRIu64
4749 " and QID %u\n",
4750 pos->mid,
4751 queue->qid);
4752 free_queue_entry (pos, tc);
4753 }
4754 }
4755 }
4756}
4757
4758
4768static void
4770 struct PendingMessage *pm,
4771 const void *payload,
4772 size_t payload_size)
4773{
4774 struct Neighbour *n = queue->neighbour;
4776 struct GNUNET_MQ_Envelope *env;
4777 struct PendingAcknowledgement *pa;
4778
4779 GNUNET_log (
4781 "Queueing %u bytes of payload for transmission <%" PRIu64
4782 "> on queue %llu to %s\n",
4783 (unsigned int) payload_size,
4784 (NULL == pm) ? 0 : pm->logging_uuid,
4785 (unsigned long long) queue->qid,
4786 GNUNET_i2s (&queue->neighbour->pid));
4787 env = GNUNET_MQ_msg_extra (smt,
4788 payload_size,
4790 smt->qid = htonl (queue->qid);
4791 smt->mid = GNUNET_htonll (queue->mid_gen);
4792 smt->receiver = n->pid;
4793 memcpy (&smt[1], payload, payload_size);
4794 {
4795 /* Pass the env to the communicator of queue for transmission. */
4796 struct QueueEntry *qe;
4797
4798 qe = GNUNET_new (struct QueueEntry);
4799 qe->creation_timestamp = GNUNET_TIME_absolute_get ();
4800 qe->mid = queue->mid_gen;
4802 "Create QueueEntry with MID %" PRIu64
4803 " and QID %u and prefix %s\n",
4804 qe->mid,
4805 queue->qid,
4806 queue->tc->details.communicator.address_prefix);
4807 queue->mid_gen++;
4808 qe->queue = queue;
4809 if (NULL != pm)
4810 {
4811 qe->pm = pm;
4812 // TODO Why do we have a retransmission. When we know, make decision if we still want this.
4813 // GNUNET_assert (NULL == pm->qe);
4814 if (NULL != pm->qe)
4815 {
4817 "Retransmitting message <%" PRIu64
4818 "> remove pm from qe with MID: %llu \n",
4819 pm->logging_uuid,
4820 (unsigned long long) pm->qe->mid);
4821 pm->qe->pm = NULL;
4822 }
4823 pm->qe = qe;
4824 }
4825 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
4826 if (0 == queue->q_capacity)
4827 {
4828 // Messages without FC or fragments can get here.
4829 if (NULL != pm)
4830 {
4832 "Message %" PRIu64
4833 " (pm type %u) was not send because queue has no capacity.\n",
4834 pm->logging_uuid,
4835 pm->pmt);
4836 pm->qe = NULL;
4837 }
4838 GNUNET_free (env);
4839 GNUNET_free (qe);
4840 return;
4841 }
4842 GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe);
4843 queue->queue_length++;
4844 queue->tc->details.communicator.total_queue_length++;
4845 if (GNUNET_NO == queue->unlimited_length)
4846 queue->q_capacity--;
4848 "Queue %s with qid %u has capacity %" PRIu64 "\n",
4849 queue->address,
4850 queue->qid,
4851 queue->q_capacity);
4853 queue->tc->details.communicator.total_queue_length)
4854 queue->idle = GNUNET_NO;
4855 if (QUEUE_LENGTH_LIMIT == queue->queue_length)
4856 queue->idle = GNUNET_NO;
4857 if (0 == queue->q_capacity)
4858 queue->idle = GNUNET_NO;
4859
4860 if (GNUNET_NO == queue->idle)
4861 {
4862 struct TransportClient *tc = queue->tc;
4863
4864 if (NULL == tc->details.communicator.free_queue_entry_task)
4865 tc->details.communicator.free_queue_entry_task =
4867 &
4869 tc);
4870 }
4871 if (NULL != pm && NULL != (pa = pm->pa_head))
4872 {
4873 while (pm != pa->pm)
4874 pa = pa->next_pa;
4875 pa->num_send++;
4876 }
4877 // GNUNET_CONTAINER_multiuuidmap_get (pending_acks, &ack[i].ack_uuid.value);
4879 "Sending message MID %" PRIu64
4880 " of type %u (%u) and size %lu with MQ %p queue %s (QID %u) pending %"
4881 PRIu64 "\n",
4882 GNUNET_ntohll (smt->mid),
4883 ntohs (((const struct GNUNET_MessageHeader *) payload)->type),
4884 ntohs (smt->header.size),
4885 (unsigned long) payload_size,
4886 queue->tc->mq,
4887 queue->address,
4888 queue->qid,
4889 (NULL == pm) ? 0 : pm->logging_uuid);
4890 GNUNET_MQ_send (queue->tc->mq, env);
4891 }
4892}
4893
4894
4905static struct GNUNET_TIME_Relative
4907 const struct GNUNET_MessageHeader *hdr,
4909{
4910 struct GNUNET_TIME_Absolute now;
4911 unsigned int candidates;
4912 unsigned int sel1;
4913 unsigned int sel2;
4914 struct GNUNET_TIME_Relative rtt;
4915
4916 /* Pick one or two 'random' queues from n (under constraints of options) */
4917 now = GNUNET_TIME_absolute_get ();
4918 /* FIXME-OPTIMIZE: give queues 'weights' and pick proportional to
4919 weight in the future; weight could be assigned by observed
4920 bandwidth (note: not sure if we should do this for this type
4921 of control traffic though). */
4922 candidates = 0;
4923 for (struct Queue *pos = n->queue_head; NULL != pos;
4924 pos = pos->next_neighbour)
4925 {
4926 if ((0 != (options & RMO_UNCONFIRMED_ALLOWED)) ||
4927 (pos->validated_until.abs_value_us > now.abs_value_us))
4928 candidates++;
4929 }
4930 if (0 == candidates)
4931 {
4932 /* This can happen rarely if the last confirmed queue timed
4933 out just as we were beginning to process this message. */
4935 "Could not route message of type %u to %s: no valid queue\n",
4936 ntohs (hdr->type),
4937 GNUNET_i2s (&n->pid));
4939 "# route selection failed (all no valid queue)",
4940 1,
4941 GNUNET_NO);
4943 }
4944
4947 if (0 == (options & RMO_REDUNDANT))
4948 sel2 = candidates; /* picks none! */
4949 else
4951 candidates = 0;
4952 for (struct Queue *pos = n->queue_head; NULL != pos;
4953 pos = pos->next_neighbour)
4954 {
4955 if ((0 != (options & RMO_UNCONFIRMED_ALLOWED)) ||
4956 (pos->validated_until.abs_value_us > now.abs_value_us))
4957 {
4958 if ((sel1 == candidates) || (sel2 == candidates))
4959 {
4961 "Routing message of type %u to %s using %s (#%u)\n",
4962 ntohs (hdr->type),
4963 GNUNET_i2s (&n->pid),
4964 pos->address,
4965 (sel1 == candidates) ? 1 : 2);
4966 rtt = GNUNET_TIME_relative_min (rtt, pos->pd.aged_rtt);
4967 queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
4968 }
4969 candidates++;
4970 }
4971 }
4972 return rtt;
4973}
4974
4975
4980{
4984 gcry_cipher_hd_t cipher;
4985
4989 struct
4990 {
4995
4999 char aes_key[256 / 8];
5000
5004 char aes_ctr[128 / 8];
5006};
5007
5008
5017static void
5019 const struct GNUNET_ShortHashCode *iv,
5020 struct DVKeyState *key)
5021{
5022 /* must match what we defive from decapsulated key */
5024 GNUNET_CRYPTO_hkdf_expand (&key->material,
5025 sizeof(key->material),
5026 km,
5027 "gnunet-transport-dv-key",
5028 strlen ("gnunet-transport-dv-key")
5029 ,
5030 km,
5031 sizeof(*km),
5032 iv,
5033 sizeof(*iv),
5034 NULL));
5036 "Deriving backchannel key based on KM %s and IV %s\n",
5037 GNUNET_sh2s (km),
5038 GNUNET_sh2s (iv));
5039 GNUNET_assert (0 == gcry_cipher_open (&key->cipher,
5040 GCRY_CIPHER_AES256 /* low level: go for speed */
5041 ,
5042 GCRY_CIPHER_MODE_CTR,
5043 0 /* flags */));
5044 GNUNET_assert (0 == gcry_cipher_setkey (key->cipher,
5045 &key->material.aes_key,
5046 sizeof(key->material.aes_key)));
5047 gcry_cipher_setctr (key->cipher,
5048 &key->material.aes_ctr,
5049 sizeof(key->material.aes_ctr));
5050}
5051
5052
5062static void
5063dv_hmac (const struct DVKeyState *key,
5064 struct GNUNET_HashCode *hmac,
5065 const void *data,
5066 size_t data_size)
5067{
5068 GNUNET_CRYPTO_hmac (&key->material.hmac_key, data, data_size, hmac);
5069}
5070
5071
5081static void
5082dv_encrypt (struct DVKeyState *key, const void *in, void *dst, size_t in_size)
5083{
5084 GNUNET_assert (0 ==
5085 gcry_cipher_encrypt (key->cipher, dst, in_size, in, in_size));
5086}
5087
5088
5099static enum GNUNET_GenericReturnValue
5101 void *out,
5102 const void *ciph,
5103 size_t out_size)
5104{
5105 return (0 ==
5106 gcry_cipher_decrypt (key->cipher,
5107 out, out_size,
5108 ciph, out_size)) ? GNUNET_OK : GNUNET_SYSERR;
5109}
5110
5111
5117static void
5119{
5120 gcry_cipher_close (key->cipher);
5121 GNUNET_CRYPTO_zero_keys (&key->material, sizeof(key->material));
5122}
5123
5124
5135typedef void (*DVMessageHandler) (void *cls,
5136 struct Neighbour *next_hop,
5137 const struct GNUNET_MessageHeader *hdr,
5139
5154static struct GNUNET_TIME_Relative
5156 unsigned int num_dvhs,
5157 struct DistanceVectorHop **dvhs,
5158 const struct GNUNET_MessageHeader *hdr,
5159 DVMessageHandler use,
5160 void *use_cls,
5162 enum GNUNET_GenericReturnValue without_fc)
5163{
5164 struct TransportDVBoxMessage box_hdr;
5165 struct TransportDVBoxPayloadP payload_hdr;
5166 uint16_t enc_body_size = ntohs (hdr->size);
5167 char enc[sizeof(struct TransportDVBoxPayloadP) + enc_body_size] GNUNET_ALIGN;
5168 struct DVKeyState *key;
5169 struct GNUNET_TIME_Relative rtt;
5170 struct GNUNET_ShortHashCode km;
5171
5172 key = GNUNET_new (struct DVKeyState);
5173 /* Encrypt payload */
5175 box_hdr.total_hops = htons (0);
5176 box_hdr.without_fc = htons (without_fc);
5177 // update_ephemeral (dv);
5178 if (0 ==
5179 GNUNET_TIME_absolute_get_remaining (dv->ephemeral_validity).rel_value_us)
5180 {
5181 GNUNET_CRYPTO_eddsa_kem_encaps (&dv->target.public_key,
5182 &dv->ephemeral_key,
5183 &km);
5184 dv->km = GNUNET_new (struct GNUNET_ShortHashCode);
5185 GNUNET_memcpy (dv->km, &km, sizeof(struct GNUNET_ShortHashCode));
5186 sign_ephemeral (dv);
5187 }
5188 box_hdr.ephemeral_key = dv->ephemeral_key;
5189 payload_hdr.sender_sig = dv->sender_sig;
5190
5192 &box_hdr.iv,
5193 sizeof(box_hdr.iv));
5194 // We are creating this key, so this must work.
5195 // FIXME: Possibly also add return values here. We are processing
5196 // Input from other peers...
5197 dv_setup_key_state_from_km (dv->km, &box_hdr.iv, key);
5198 payload_hdr.sender = GST_my_identity;
5199 payload_hdr.monotonic_time = GNUNET_TIME_absolute_hton (dv->monotime);
5200 dv_encrypt (key, &payload_hdr, enc, sizeof(payload_hdr));
5201 dv_encrypt (key,
5202 hdr,
5203 &enc[sizeof(struct TransportDVBoxPayloadP)],
5204 enc_body_size);
5205 dv_hmac (key, &box_hdr.hmac, enc, sizeof(enc));
5206 dv_key_clean (key);
5208 /* For each selected path, take the pre-computed header and body
5209 and add the path in the middle of the message; then send it. */
5210 for (unsigned int i = 0; i < num_dvhs; i++)
5211 {
5212 struct DistanceVectorHop *dvh = dvhs[i];
5213 unsigned int num_hops = dvh->distance + 1;
5214 char buf[sizeof(struct TransportDVBoxMessage)
5215 + sizeof(struct GNUNET_PeerIdentity) * num_hops
5216 + sizeof(struct TransportDVBoxPayloadP)
5217 + enc_body_size] GNUNET_ALIGN;
5218 struct GNUNET_PeerIdentity *dhops;
5219
5220 box_hdr.header.size = htons (sizeof(buf));
5221 box_hdr.orig_size = htons (sizeof(buf));
5222 box_hdr.num_hops = htons (num_hops);
5223 memcpy (buf, &box_hdr, sizeof(box_hdr));
5224 dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof(box_hdr)];
5225 memcpy (dhops,
5226 dvh->path,
5227 dvh->distance * sizeof(struct GNUNET_PeerIdentity));
5228 dhops[dvh->distance] = dv->target;
5229 if (GNUNET_EXTRA_LOGGING > 0)
5230 {
5231 char *path;
5232
5234 for (unsigned int j = 0; j < num_hops; j++)
5235 {
5236 char *tmp;
5237
5238 GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[j]));
5239 GNUNET_free (path);
5240 path = tmp;
5241 }
5243 "Routing message of type %u to %s using DV (#%u/%u) via %s\n",
5244 ntohs (hdr->type),
5245 GNUNET_i2s (&dv->target),
5246 i + 1,
5247 num_dvhs,
5248 path);
5249 GNUNET_free (path);
5250 }
5251 rtt = GNUNET_TIME_relative_min (rtt, dvh->pd.aged_rtt);
5252 memcpy (&dhops[num_hops], enc, sizeof(enc));
5253 use (use_cls,
5254 dvh->next_hop,
5255 (const struct GNUNET_MessageHeader *) buf,
5256 options);
5257 GNUNET_free (key);
5258 }
5259 return rtt;
5260}
5261
5262
5272static void
5274 struct Neighbour *next_hop,
5275 const struct GNUNET_MessageHeader *hdr,
5277{
5278 (void) cls;
5279 (void) route_via_neighbour (next_hop, hdr, RMO_UNCONFIRMED_ALLOWED);
5280}
5281
5282
5294static struct GNUNET_TIME_Relative
5296// route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
5297 const struct GNUNET_MessageHeader *hdr,
5299{
5300 // struct VirtualLink *vl;
5301 struct Neighbour *n;
5302 struct DistanceVector *dv;
5303 struct GNUNET_TIME_Relative rtt1;
5304 struct GNUNET_TIME_Relative rtt2;
5305 const struct GNUNET_PeerIdentity *target = &vl->target;
5306
5308 "Trying to route message of type %u to %s without fc\n",
5309 ntohs (hdr->type),
5310 GNUNET_i2s (target));
5311
5312 // TODO Do this elsewhere. vl should be given as parameter to method.
5313 // vl = lookup_virtual_link (target);
5314 GNUNET_assert (NULL != vl && GNUNET_YES == vl->confirmed);
5315 if (NULL == vl)
5317 n = vl->n;
5318 dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
5319 if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
5320 {
5321 /* if confirmed is required, and we do not have anything
5322 confirmed, drop respective options */
5323 if (NULL == n)
5324 n = lookup_neighbour (target);
5325 if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
5327 }
5328 if ((NULL == n) && (NULL == dv))
5329 {
5331 "Cannot route message of type %u to %s: no route\n",
5332 ntohs (hdr->type),
5333 GNUNET_i2s (target));
5335 "# Messages dropped in routing: no acceptable method",
5336 1,
5337 GNUNET_NO);
5339 }
5341 "Routing message of type %u to %s with options %X\n",
5342 ntohs (hdr->type),
5343 GNUNET_i2s (target),
5344 (unsigned int) options);
5345 /* If both dv and n are possible and we must choose:
5346 flip a coin for the choice between the two; for now 50/50 */
5347 if ((NULL != n) && (NULL != dv) && (0 == (options & RMO_REDUNDANT)))
5348 {
5350 n = NULL;
5351 else
5352 dv = NULL;
5353 }
5354 if ((NULL != n) && (NULL != dv))
5355 options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's
5356 enough for redundancy, so clear the flag. */
5359 if (NULL != n)
5360 {
5362 "Try to route message of type %u to %s without fc via neighbour\n",
5363 ntohs (hdr->type),
5364 GNUNET_i2s (target));
5365 rtt1 = route_via_neighbour (n, hdr, options);
5366 }
5367 if (NULL != dv)
5368 {
5369 struct DistanceVectorHop *hops[2];
5370 unsigned int res;
5371
5373 options,
5374 hops,
5375 (0 == (options & RMO_REDUNDANT)) ? 1 : 2);
5376 if (0 == res)
5377 {
5379 "Failed to route message, could not determine DV path\n");
5380 return rtt1;
5381 }
5383 "encapsulate_for_dv 1\n");
5384 rtt2 = encapsulate_for_dv (dv,
5385 res,
5386 hops,
5387 hdr,
5389 NULL,
5391 GNUNET_YES);
5392 }
5393 return GNUNET_TIME_relative_min (rtt1, rtt2);
5394}
5395
5396
5397static void
5398consider_sending_fc (void *cls);
5399
5406static void
5408{
5409 struct VirtualLink *vl = cls;
5410 vl->fc_retransmit_task = NULL;
5411 consider_sending_fc (cls);
5412}
5413
5414
5415static char *
5416get_address_without_port (const char *address);
5417
5418
5420{
5421 size_t off;
5422 char *tgnas;
5423};
5424
5425
5426static enum GNUNET_GenericReturnValue
5428 const struct GNUNET_PeerIdentity *pid,
5429 void *value)
5430{
5431 struct AddGlobalAddressesContext *ctx = cls;
5432 struct TransportGlobalNattedAddress *tgna = value;
5433 char *addr = (char *) &tgna[1];
5434
5436 "sending address %s length %u\n",
5437 addr,
5438 ntohl (tgna->address_length));
5439 GNUNET_memcpy (&(ctx->tgnas[ctx->off]), tgna, sizeof (struct
5441 + ntohl (tgna->address_length));
5442 ctx->off += sizeof(struct TransportGlobalNattedAddress) + ntohl (tgna->
5444
5445 return GNUNET_OK;
5446}
5447
5448
5449static struct GNUNET_TIME_Relative
5450calculate_rtt (struct DistanceVector *dv);
5451
5452
5459static void
5461{
5462 struct VirtualLink *vl = cls;
5463 struct GNUNET_TIME_Absolute monotime;
5464 struct TransportFlowControlMessage *fc;
5466 struct GNUNET_TIME_Relative rtt;
5467 struct GNUNET_TIME_Relative rtt_average;
5468 struct Neighbour *n = vl->n;
5469
5470 if (NULL != n && 0 < n->number_of_addresses)
5471 {
5472 size_t addresses_size =
5473 n->number_of_addresses * sizeof (struct TransportGlobalNattedAddress) + n
5474 ->size_of_global_addresses;
5475 char *tgnas = GNUNET_malloc (addresses_size);
5477 ctx.off = 0;
5478 ctx.tgnas = tgnas;
5479
5480 fc = GNUNET_malloc (sizeof (struct TransportFlowControlMessage)
5481 + addresses_size);
5482 fc->header.size = htons (sizeof(struct TransportFlowControlMessage)
5483 + addresses_size);
5484 fc->size_of_addresses = htonl (n->size_of_global_addresses);
5485 fc->number_of_addresses = htonl (n->number_of_addresses);
5486 GNUNET_CONTAINER_multipeermap_iterate (n->natted_addresses,
5488 &ctx);
5489 GNUNET_memcpy (&fc[1], tgnas, addresses_size);
5491 }
5492 else
5493 {
5494 fc = GNUNET_malloc (sizeof (struct TransportFlowControlMessage));
5495 fc->header.size = htons (sizeof(struct TransportFlowControlMessage));
5496 }
5497
5499 /* OPTIMIZE-FC-BDP: decide sane criteria on when to do this, instead of doing
5500 it always! */
5501 /* For example, we should probably ONLY do this if a bit more than
5502 an RTT has passed, or if the window changed "significantly" since
5503 then. See vl->last_fc_rtt! NOTE: to do this properly, we also
5504 need an estimate for the bandwidth-delay-product for the entire
5505 VL, as that determines "significantly". We have the delay, but
5506 the bandwidth statistics need to be added for the VL!*/(void) duration;
5507
5508 if (NULL != vl->dv)
5509 rtt_average = calculate_rtt (vl->dv);
5510 else
5511 rtt_average = GNUNET_TIME_UNIT_FOREVER_REL;
5512 fc->rtt = GNUNET_TIME_relative_hton (rtt_average);
5514 "Sending FC seq %u to %s with new window %llu %lu %u\n",
5515 (unsigned int) vl->fc_seq_gen,
5516 GNUNET_i2s (&vl->target),
5517 (unsigned long long) vl->incoming_fc_window_size,
5518 (unsigned long) rtt_average.rel_value_us,
5519 vl->sync_ready);
5521 vl->last_fc_transmission = monotime;
5522 fc->sync_ready = vl->sync_ready;
5524 fc->seq = htonl (vl->fc_seq_gen++);
5530 fc->sender_time = GNUNET_TIME_absolute_hton (monotime);
5532 if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us)
5533 {
5536 "FC retransmission to %s failed, will retry in %s\n",
5537 GNUNET_i2s (&vl->target),
5540 }
5541 else
5542 {
5543 /* OPTIMIZE-FC-BDP: rtt is not ideal, we can do better! */
5544 vl->last_fc_rtt = rtt;
5545 }
5546 if (NULL != vl->fc_retransmit_task)
5549 {
5551 vl->fc_retransmit_count = 0;
5552 }
5553 vl->fc_retransmit_task =
5555 vl->fc_retransmit_count++;
5556 GNUNET_free (fc);
5557}
5558
5559
5576static void
5578{
5579 struct Neighbour *n = vl->n;
5580 struct DistanceVector *dv = vl->dv;
5581 struct GNUNET_TIME_Absolute now;
5582 struct VirtualLink *vl_next_hop;
5583 int elig;
5584
5586 "check_vl_transmission to target %s\n",
5587 GNUNET_i2s (&vl->target));
5588 /* Check that we have an eligible pending message!
5589 (cheaper than having #transmit_on_queue() find out!) */
5590 elig = GNUNET_NO;
5591 for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
5592 pm = pm->next_vl)
5593 {
5595 "check_vl_transmission loop\n");
5596 if (NULL != pm->qe)
5597 continue; /* not eligible, is in a queue! */
5598 if (pm->bytes_msg + vl->outbound_fc_window_size_used >
5600 {
5602 "Stalled message %" PRIu64
5603 " transmission on VL %s due to flow control: %llu < %llu\n",
5604 pm->logging_uuid,
5605 GNUNET_i2s (&vl->target),
5606 (unsigned long long) vl->outbound_fc_window_size,
5607 (unsigned long long) (pm->bytes_msg
5610 return; /* We have a message, but flow control says "nope" */
5611 }
5613 "Target window on VL %s not stalled. Scheduling transmission on queue\n",
5614 GNUNET_i2s (&vl->target));
5615 /* Notify queues at direct neighbours that we are interested */
5616 now = GNUNET_TIME_absolute_get ();
5617 if (NULL != n)
5618 {
5619 for (struct Queue *queue = n->queue_head; NULL != queue;
5620 queue = queue->next_neighbour)
5621 {
5622 if ((GNUNET_YES == queue->idle) &&
5623 (queue->validated_until.abs_value_us > now.abs_value_us))
5624 {
5626 "Direct neighbour %s not stalled\n",
5627 GNUNET_i2s (&n->pid));
5629 queue,
5631 elig = GNUNET_YES;
5632 }
5633 else
5635 "Neighbour Queue QID: %u (%u) busy or invalid\n",
5636 queue->qid,
5637 queue->idle);
5638 }
5639 }
5640 /* Notify queues via DV that we are interested */
5641 if (NULL != dv)
5642 {
5643 /* Do DV with lower scheduler priority, which effectively means that
5644 IF a neighbour exists and is available, we prefer it. */
5645 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
5646 pos = pos->next_dv)
5647 {
5648 struct Neighbour *nh_iter = pos->next_hop;
5649
5650
5651 if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
5652 continue; /* skip this one: path not validated */
5653 else
5654 {
5655 vl_next_hop = lookup_virtual_link (&nh_iter->pid);
5656 GNUNET_assert (NULL != vl_next_hop);
5657 if (pm->bytes_msg + vl_next_hop->outbound_fc_window_size_used >
5658 vl_next_hop->outbound_fc_window_size)
5659 {
5661 "Stalled message %" PRIu64
5662 " transmission on next hop %s due to flow control: %llu < %llu\n",
5663 pm->logging_uuid,
5664 GNUNET_i2s (&vl_next_hop->target),
5665 (unsigned long
5666 long) vl_next_hop->outbound_fc_window_size,
5667 (unsigned long long) (pm->bytes_msg
5668 + vl_next_hop->
5669 outbound_fc_window_size_used));
5670 consider_sending_fc (vl_next_hop);
5671 continue; /* We have a message, but flow control says "nope" for the first hop of this path */
5672 }
5673 for (struct Queue *queue = nh_iter->queue_head; NULL != queue;
5674 queue = queue->next_neighbour)
5675 if ((GNUNET_YES == queue->idle) &&
5676 (queue->validated_until.abs_value_us > now.abs_value_us))
5677 {
5679 "Next hop neighbour %s not stalled\n",
5680 GNUNET_i2s (&nh_iter->pid));
5682 queue,
5684 elig = GNUNET_YES;
5685 }
5686 else
5688 "DV Queue QID: %u (%u) busy or invalid\n",
5689 queue->qid,
5690 queue->idle);
5691 }
5692 }
5693 }
5694 if (GNUNET_YES == elig)
5696 "Eligible message %" PRIu64 " of size %u to %s: %llu/%llu\n",
5697 pm->logging_uuid,
5698 pm->bytes_msg,
5699 GNUNET_i2s (&vl->target),
5700 (unsigned long long) vl->outbound_fc_window_size,
5701 (unsigned long long) (pm->bytes_msg
5703 break;
5704 }
5705}
5706
5707
5714static void
5715handle_client_send (void *cls, const struct OutboundMessage *obm)
5716{
5717 struct TransportClient *tc = cls;
5718 struct PendingMessage *pm;
5719 const struct GNUNET_MessageHeader *obmm;
5720 uint32_t bytes_msg;
5721 struct VirtualLink *vl;
5723
5724 GNUNET_assert (CT_CORE == tc->type);
5725 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
5726 bytes_msg = ntohs (obmm->size);
5727 pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
5728 vl = lookup_virtual_link (&obm->peer);
5729 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5730 {
5732 "Don't have %s as a neighbour (anymore).\n",
5733 GNUNET_i2s (&obm->peer));
5734 /* Failure: don't have this peer as a neighbour (anymore).
5735 Might have gone down asynchronously, so this is NOT
5736 a protocol violation by CORE. Still count the event,
5737 as this should be rare. */
5740 "# messages dropped (neighbour unknown)",
5741 1,
5742 GNUNET_NO);
5743 return;
5744 }
5745
5746 pm = GNUNET_malloc (sizeof(struct PendingMessage) + bytes_msg);
5748 "1 created pm %p storing vl %p\n",
5749 pm,
5750 vl);
5751 pm->logging_uuid = logging_uuid_gen++;
5752 pm->prefs = pp;
5753 pm->client = tc;
5754 pm->vl = vl;
5755 pm->bytes_msg = bytes_msg;
5756 memcpy (&pm[1], obmm, bytes_msg);
5758 "Sending message of type %u with %u bytes as <%" PRIu64
5759 "> to %s\n",
5760 ntohs (obmm->type),
5761 bytes_msg,
5762 pm->logging_uuid,
5763 GNUNET_i2s (&obm->peer));
5765 tc->details.core.pending_msg_head,
5766 tc->details.core.pending_msg_tail,
5767 pm);
5769 vl->pending_msg_head,
5770 vl->pending_msg_tail,
5771 pm);
5774}
5775
5776
5786static void
5788 void *cls,
5790{
5791 struct Neighbour *n;
5792 struct VirtualLink *vl;
5793 struct TransportClient *tc = cls;
5794 const struct GNUNET_MessageHeader *inbox =
5795 (const struct GNUNET_MessageHeader *) &cb[1];
5796 uint16_t isize = ntohs (inbox->size);
5797 const char *is = ((const char *) &cb[1]) + isize;
5798 size_t slen = strlen (is) + 1;
5799 char
5800 mbuf[slen + isize
5801 + sizeof(struct
5805
5806 /* 0-termination of 'is' was checked already in
5807 #check_communicator_backchannel() */
5809 "Preparing backchannel transmission to %s:%s of type %u and size %u\n",
5810 GNUNET_i2s (&cb->pid),
5811 is,
5812 ntohs (inbox->type),
5813 ntohs (inbox->size));
5814 /* encapsulate and encrypt message */
5815 be->header.type =
5817 be->header.size = htons (sizeof(mbuf));
5818 memcpy (&be[1], inbox, isize);
5819 memcpy (&mbuf[sizeof(struct TransportBackchannelEncapsulationMessage)
5820 + isize],
5821 is,
5822 strlen (is) + 1);
5823 // route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED);
5824 vl = lookup_virtual_link (&cb->pid);
5825 if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
5826 {
5828 }
5829 else
5830 {
5831 /* Use route via neighbour */
5832 n = lookup_neighbour (&cb->pid);
5833 if (NULL != n)
5835 n,
5836 &be->header,
5837 RMO_NONE);
5838 }
5840}
5841
5842
5850static int
5852 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
5853{
5854 struct TransportClient *tc = cls;
5855
5856 if (CT_COMMUNICATOR != tc->type)
5857 {
5858 GNUNET_break (0);
5859 return GNUNET_SYSERR;
5860 }
5862 return GNUNET_OK;
5863}
5864
5865
5871static void
5872store_pi (void *cls);
5873
5874
5881static void
5882peerstore_store_own_cb (void *cls, int success)
5883{
5884 struct AddressListEntry *ale = cls;
5885
5886 ale->sc = NULL;
5887 if (GNUNET_YES != success)
5889 "Failed to store our own address `%s' in peerstore!\n",
5890 ale->address);
5891 else
5893 "Successfully stored our own address `%s' in peerstore!\n",
5894 ale->address);
5895 /* refresh period is 1/4 of expiration time, that should be plenty
5896 without being excessive. */
5897 ale->st =
5899 4ULL),
5900 &store_pi,
5901 ale);
5902}
5903
5904
5905static void
5906shc_cont (void *cls, int success)
5907{
5908 struct AddressListEntry *ale = cls;
5910
5913 "transport",
5916 ale->signed_address,
5917 ale->signed_address_len,
5918 expiration,
5921 ale);
5922 if (NULL == ale->sc)
5923 {
5925 "Failed to store our address `%s' with peerstore\n",
5926 ale->address);
5928 &store_pi,
5929 ale);
5930 }
5931}
5932
5933
5939static void
5940store_pi (void *cls)
5941{
5942 struct AddressListEntry *ale = cls;
5943 struct GNUNET_MQ_Envelope *env;
5944 const struct GNUNET_MessageHeader *msg;
5945 const char *dash;
5946 char *address_uri;
5948 unsigned int add_success;
5949
5950 dash = strchr (ale->address, '-');
5951 GNUNET_assert (NULL != dash);
5952 dash++;
5953 GNUNET_asprintf (&address_uri,
5954 "%s://%s",
5955 prefix,
5956 dash);
5958 ale->st = NULL;
5960 "Storing our address `%s' in peerstore until %s!\n",
5961 ale->address,
5964 address_uri);
5965 if (GNUNET_OK != add_success)
5966 {
5968 "Storing our address `%s' %s\n",
5969 address_uri,
5970 GNUNET_NO == add_success ? "not done" : "failed");
5971 GNUNET_free (address_uri);
5972 return;
5973 }
5974 else
5975 {
5976
5978 "Storing our address `%s'\n",
5979 address_uri);
5980 }
5981 // FIXME hello_mono_time used here?? What about expiration in ale?
5983 ale->nt,
5986 &ale->signed_address,
5987 &ale->signed_address_len);
5988 GNUNET_free (address_uri);
5994 "store_pi 1\n");
5996 msg,
5997 shc_cont,
5998 ale);
5999 GNUNET_free (env);
6000}
6001
6002
6003static struct AddressListEntry *
6007 const char *address,
6008 uint32_t aid,
6009 size_t slen)
6010{
6011 struct AddressListEntry *ale;
6012 char *address_without_port;
6013
6014 ale = GNUNET_malloc (sizeof(struct AddressListEntry) + slen);
6015 ale->tc = tc;
6016 ale->address = (const char *) &ale[1];
6017 ale->expiration = expiration;
6018 ale->aid = aid;
6019 ale->nt = nt;
6020 memcpy (&ale[1], address, slen);
6021 address_without_port = get_address_without_port (ale->address);
6023 "Is this %s a local address (%s)\n",
6024 address_without_port,
6025 ale->address);
6026 if (0 != strcmp ("127.0.0.1", address_without_port))
6027 ale->st = GNUNET_SCHEDULER_add_now (&store_pi, ale);
6028 GNUNET_free (address_without_port);
6029
6030 return ale;
6031}
6032
6033
6040static void
6042 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
6043{
6044 struct TransportClient *tc = cls;
6045 struct AddressListEntry *ale;
6046 size_t slen;
6047 char *address;
6048
6049 /* 0-termination of &aam[1] was checked in #check_add_address */
6051 "Communicator added address `%s'!\n",
6052 (const char *) &aam[1]);
6053 slen = ntohs (aam->header.size) - sizeof(*aam);
6054 address = GNUNET_malloc (slen);
6055 memcpy (address, &aam[1], slen);
6056 ale = create_address_entry (tc,
6058 (enum GNUNET_NetworkType) ntohl (aam->nt),
6059 address,
6060 aam->aid,
6061 slen);
6062 GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
6063 tc->details.communicator.addr_tail,
6064 ale);
6067}
6068
6069
6076static void
6078 const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
6079{
6080 struct TransportClient *tc = cls;
6081 struct AddressListEntry *alen;
6082
6083 if (CT_COMMUNICATOR != tc->type)
6084 {
6085 GNUNET_break (0);
6087 return;
6088 }
6089 for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
6090 NULL != ale;
6091 ale = alen)
6092 {
6093 alen = ale->next;
6094 if (dam->aid != ale->aid)
6095 continue;
6096 GNUNET_assert (ale->tc == tc);
6098 "Communicator deleted address `%s'!\n",
6099 ale->address);
6102 return;
6103 }
6105 "Communicator removed address we did not even have.\n");
6107 // GNUNET_SERVICE_client_drop (tc->client);
6108}
6109
6110
6118static void
6120
6121
6129static void
6131{
6132 struct CoreSentContext *ctx = cls;
6133 struct VirtualLink *vl = ctx->vl;
6134
6135 if (NULL == vl)
6136 {
6137 /* lost the link in the meantime, ignore */
6138 GNUNET_free (ctx);
6139 return;
6140 }
6143 vl->incoming_fc_window_size_ram -= ctx->size;
6144 vl->incoming_fc_window_size_used += ctx->isize;
6146 GNUNET_free (ctx);
6147}
6148
6149
6150static void
6152 const struct GNUNET_MessageHeader *mh,
6153 struct CommunicatorMessageContext *cmc,
6154 unsigned int free_cmc)
6155{
6156 uint16_t size = ntohs (mh->size);
6157 int have_core;
6158
6159 if (vl->incoming_fc_window_size_ram > UINT_MAX - size)
6160 {
6162 "# CORE messages dropped (FC arithmetic overflow)",
6163 1,
6164 GNUNET_NO);
6166 "CORE messages of type %u with %u bytes dropped (FC arithmetic overflow)\n",
6167 (unsigned int) ntohs (mh->type),
6168 (unsigned int) ntohs (mh->size));
6169 if (GNUNET_YES == free_cmc)
6171 return;
6172 }
6174 {
6176 "# CORE messages dropped (FC window overflow)",
6177 1,
6178 GNUNET_NO);
6180 "CORE messages of type %u with %u bytes dropped (FC window overflow)\n",
6181 (unsigned int) ntohs (mh->type),
6182 (unsigned int) ntohs (mh->size));
6183 if (GNUNET_YES == free_cmc)
6185 return;
6186 }
6187
6188 /* Forward to all CORE clients */
6189 have_core = GNUNET_NO;
6190 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
6191 {
6192 struct GNUNET_MQ_Envelope *env;
6193 struct InboundMessage *im;
6194 struct CoreSentContext *ctx;
6195
6196 if (CT_CORE != tc->type)
6197 continue;
6200 ctx = GNUNET_new (struct CoreSentContext);
6201 ctx->vl = vl;
6202 ctx->size = size;
6203 ctx->isize = (GNUNET_NO == have_core) ? size : 0;
6204 have_core = GNUNET_YES;
6207 im->peer = cmc->im.sender;
6208 memcpy (&im[1], mh, size);
6209 GNUNET_MQ_send (tc->mq, env);
6211 }
6212 if (GNUNET_NO == have_core)
6213 {
6215 "Dropped message to CORE: no CORE client connected!\n");
6216 /* Nevertheless, count window as used, as it is from the
6217 perspective of the other peer! */
6219 /* TODO-M1 */
6221 "Dropped message of type %u with %u bytes to CORE: no CORE client connected!\n",
6222 (unsigned int) ntohs (mh->type),
6223 (unsigned int) ntohs (mh->size));
6224 if (GNUNET_YES == free_cmc)
6226 return;
6227 }
6229 "Delivered message from %s of type %u to CORE recv window %d\n",
6230 GNUNET_i2s (&cmc->im.sender),
6231 ntohs (mh->type),
6233 if (vl->core_recv_window > 0)
6234 {
6235 if (GNUNET_YES == free_cmc)
6237 return;
6238 }
6239 /* Wait with calling #finish_cmc_handling(cmc) until the message
6240 was processed by CORE MQs (for CORE flow control)! */
6241 if (GNUNET_YES == free_cmc)
6243}
6244
6245
6254static void
6256{
6257 struct CommunicatorMessageContext *cmc = cls;
6258 // struct CommunicatorMessageContext *cmc_copy =
6259 // GNUNET_new (struct CommunicatorMessageContext);
6260 struct GNUNET_MessageHeader *mh_copy;
6261 struct RingBufferEntry *rbe;
6262 struct VirtualLink *vl;
6263 uint16_t size = ntohs (mh->size);
6264
6266 "Handling raw message of type %u with %u bytes\n",
6267 (unsigned int) ntohs (mh->type),
6268 (unsigned int) ntohs (mh->size));
6269
6270 if ((size > UINT16_MAX - sizeof(struct InboundMessage)) ||
6271 (size < sizeof(struct GNUNET_MessageHeader)))
6272 {
6273 struct GNUNET_SERVICE_Client *client = cmc->tc->client;
6274
6275 GNUNET_break (0);
6276 finish_cmc_handling (cmc);
6278 return;
6279 }
6280 vl = lookup_virtual_link (&cmc->im.sender);
6281 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
6282 {
6283 /* FIXME: sender is giving us messages for CORE but we don't have
6284 the link up yet! I *suspect* this can happen right now (i.e.
6285 sender has verified us, but we didn't verify sender), but if
6286 we pass this on, CORE would be confused (link down, messages
6287 arrive). We should investigate more if this happens often,
6288 or in a persistent manner, and possibly do "something" about
6289 it. Thus logging as error for now. */
6290
6291 mh_copy = GNUNET_malloc (size);
6292 rbe = GNUNET_new (struct RingBufferEntry);
6293 rbe->cmc = cmc;
6294 /*cmc_copy->tc = cmc->tc;
6295 cmc_copy->im = cmc->im;*/
6296 GNUNET_memcpy (mh_copy, mh, size);
6297
6298 rbe->mh = mh_copy;
6299
6301 {
6302 struct RingBufferEntry *rbe_old = ring_buffer[ring_buffer_head];
6303 GNUNET_free (rbe_old->cmc);
6304 GNUNET_free (rbe_old->mh);
6305 GNUNET_free (rbe_old);
6306 }
6307 ring_buffer[ring_buffer_head] = rbe;// cmc_copy;
6308 // cmc_copy->mh = (const struct GNUNET_MessageHeader *) mh_copy;
6309 cmc->mh = (const struct GNUNET_MessageHeader *) mh_copy;
6311 "Storing message for %s and type %u (%u) in ring buffer head %u is full %u\n",
6312 GNUNET_i2s (&cmc->im.sender),
6313 (unsigned int) ntohs (mh->type),
6314 (unsigned int) ntohs (mh_copy->type),
6318 {
6319 ring_buffer_head = 0;
6321 }
6322 else
6324
6326 "%u items stored in ring buffer\n",
6329
6330 /*GNUNET_break_op (0);
6331 GNUNET_STATISTICS_update (GST_stats,
6332 "# CORE messages dropped (virtual link still down)",
6333 1,
6334 GNUNET_NO);
6335
6336 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6337 "CORE messages of type %u with %u bytes dropped (virtual link still down)\n",
6338 (unsigned int) ntohs (mh->type),
6339 (unsigned int) ntohs (mh->size));
6340 finish_cmc_handling (cmc);*/
6343 // GNUNET_free (cmc);
6344 return;
6345 }
6347}
6348
6349
6357static int
6359{
6360 uint16_t size = ntohs (fb->header.size);
6361 uint16_t bsize = size - sizeof(*fb);
6362
6363 (void) cls;
6364 if (0 == bsize)
6365 {
6366 GNUNET_break_op (0);
6367 return GNUNET_SYSERR;
6368 }
6369 if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
6370 {
6371 GNUNET_break_op (0);
6372 return GNUNET_SYSERR;
6373 }
6374 if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
6375 {
6376 GNUNET_break_op (0);
6377 return GNUNET_SYSERR;
6378 }
6379 return GNUNET_YES;
6380}
6381
6382
6388static void
6390{
6391 struct AcknowledgementCummulator *ac = cls;
6392
6393 ac->task = NULL;
6394 GNUNET_assert (0 == ac->num_acks);
6396 GNUNET_YES ==
6398 GNUNET_free (ac);
6399}
6400
6401
6407static void
6409{
6410 struct Neighbour *n;
6411 struct VirtualLink *vl;
6412 struct AcknowledgementCummulator *ac = cls;
6413 char buf[sizeof(struct TransportReliabilityAckMessage)
6414 + ac->num_acks
6416 struct TransportReliabilityAckMessage *ack =
6417 (struct TransportReliabilityAckMessage *) buf;
6419
6420 ac->task = NULL;
6422 "Sending ACK with %u components to %s\n",
6423 ac->num_acks,
6424 GNUNET_i2s (&ac->target));
6425 GNUNET_assert (0 < ac->num_acks);
6427 ack->header.size =
6428 htons (sizeof(*ack)
6429 + ac->num_acks * sizeof(struct TransportCummulativeAckPayloadP));
6430 ack->ack_counter = htonl (ac->ack_counter += ac->num_acks);
6431 ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
6432 for (unsigned int i = 0; i < ac->num_acks; i++)
6433 {
6434 ap[i].ack_uuid = ac->ack_uuids[i].ack_uuid;
6437 }
6438 /*route_control_message_without_fc (
6439 &ac->target,
6440 &ack->header,
6441 RMO_DV_ALLOWED);*/
6442 vl = lookup_virtual_link (&ac->target);
6443 if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
6444 {
6446 vl,
6447 &ack->header,
6449 }
6450 else
6451 {
6452 /* Use route via neighbour */
6453 n = lookup_neighbour (&ac->target);
6454 if (NULL != n)
6456 n,
6457 &ack->header,
6458 RMO_NONE);
6459 }
6460 ac->num_acks = 0;
6463 ac);
6464}
6465
6466
6475static void
6477 const struct AcknowledgementUUIDP *ack_uuid,
6478 struct GNUNET_TIME_Absolute max_delay)
6479{
6480 struct AcknowledgementCummulator *ac;
6481
6483 "Scheduling ACK %s for transmission to %s\n",
6484 GNUNET_uuid2s (&ack_uuid->value),
6485 GNUNET_i2s (pid));
6487 if (NULL == ac)
6488 {
6490 ac->target = *pid;
6491 ac->min_transmission_time = max_delay;
6495 &ac->target,
6496 ac,
6498 }
6499 else
6500 {
6501 if (MAX_CUMMULATIVE_ACKS == ac->num_acks)
6502 {
6503 /* must run immediately, ack buffer full! */
6505 }
6509 }
6512 ac->ack_uuids[ac->num_acks].ack_uuid = *ack_uuid;
6513 ac->num_acks++;
6516 ac);
6517}
6518
6519
6524{
6529
6534};
6535
6536
6546static int
6547find_by_message_uuid (void *cls, uint32_t key, void *value)
6548{
6549 struct FindByMessageUuidContext *fc = cls;
6550 struct ReassemblyContext *rc = value;
6551
6552 (void) key;
6553 if (0 == GNUNET_memcmp (&fc->message_uuid, &rc->msg_uuid))
6554 {
6555 fc->rc = rc;
6556 return GNUNET_NO;
6557 }
6558 return GNUNET_YES;
6559}
6560
6561
6569static void
6571{
6572 struct CommunicatorMessageContext *cmc = cls;
6573 struct VirtualLink *vl;
6574 struct ReassemblyContext *rc;
6575 const struct GNUNET_MessageHeader *msg;
6576 uint16_t msize;
6577 uint16_t fsize;
6578 uint16_t frag_off;
6579 char *target;
6580 struct GNUNET_TIME_Relative cdelay;
6581 struct FindByMessageUuidContext fc;
6582
6583 vl = lookup_virtual_link (&cmc->im.sender);
6584 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
6585 {
6586 struct GNUNET_SERVICE_Client *client = cmc->tc->client;
6587
6589 "No virtual link for %s to handle fragment\n",
6590 GNUNET_i2s (&cmc->im.sender));
6591 GNUNET_break (0);
6592 finish_cmc_handling (cmc);
6594 return;
6595 }
6596 if (NULL == vl->reassembly_map)
6597 {
6599 vl->reassembly_heap =
6604 vl);
6605 }
6606 msize = ntohs (fb->msg_size);
6607 fc.message_uuid = fb->msg_uuid;
6608 fc.rc = NULL;
6610 fb->msg_uuid.uuid,
6612 &fc);
6613 fsize = ntohs (fb->header.size) - sizeof(*fb);
6614 if (NULL == (rc = fc.rc))
6615 {
6616 rc = GNUNET_malloc (sizeof(*rc) + msize /* reassembly payload buffer */
6617 + (msize + 7) / 8 * sizeof(uint8_t) /* bitfield */);
6618 rc->msg_uuid = fb->msg_uuid;
6619 rc->virtual_link = vl;
6620 rc->msg_size = msize;
6621 rc->reassembly_timeout =
6625 rc,
6629 vl->reassembly_map,
6630 rc->msg_uuid.uuid,
6631 rc,
6633 target = (char *) &rc[1];
6634 rc->bitfield = (uint8_t *) (target + rc->msg_size);
6635 if (fsize != rc->msg_size)
6636 rc->msg_missing = rc->msg_size;
6637 else
6638 rc->msg_missing = 0;
6640 "Received fragment with size %u at offset %u/%u %u bytes missing from %s for NEW message %"
6641 PRIu64 "\n",
6642 fsize,
6643 ntohs (fb->frag_off),
6644 msize,
6645 rc->msg_missing,
6646 GNUNET_i2s (&cmc->im.sender),
6647 fb->msg_uuid.uuid);
6648 }
6649 else
6650 {
6651 target = (char *) &rc[1];
6653 "Received fragment at offset %u/%u from %s for message %u\n",
6654 ntohs (fb->frag_off),
6655 msize,
6656 GNUNET_i2s (&cmc->im.sender),
6657 (unsigned int) fb->msg_uuid.uuid);
6658 }
6659 if (msize != rc->msg_size)
6660 {
6661 GNUNET_break (0);
6662 finish_cmc_handling (cmc);
6663 return;
6664 }
6665
6666 /* reassemble */
6667 if (0 == fsize)
6668 {
6669 GNUNET_break (0);
6670 finish_cmc_handling (cmc);
6671 return;
6672 }
6673 frag_off = ntohs (fb->frag_off);
6674 if (frag_off + fsize > msize)
6675 {
6676 /* Fragment (plus fragment size) exceeds message size! */
6677 GNUNET_break_op (0);
6678 finish_cmc_handling (cmc);
6679 return;
6680 }
6681 memcpy (&target[frag_off], &fb[1], fsize);
6682 /* update bitfield and msg_missing */
6683 for (unsigned int i = frag_off; i < frag_off + fsize; i++)
6684 {
6685 if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
6686 {
6687 rc->bitfield[i / 8] |= (1 << (i % 8));
6688 rc->msg_missing--;
6689 }
6690 }
6691
6692 /* Compute cumulative ACK */
6694 cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->msg_missing / fsize);
6695 if (0 == rc->msg_missing)
6696 cdelay = GNUNET_TIME_UNIT_ZERO;
6697 cummulative_ack (&cmc->im.sender,
6698 &fb->ack_uuid,
6701 /* is reassembly complete? */
6702 if (0 != rc->msg_missing)
6703 {
6704 finish_cmc_handling (cmc);
6705 return;
6706 }
6707 /* reassembly is complete, verify result */
6708 msg = (const struct GNUNET_MessageHeader *) &rc[1];
6709 if (ntohs (msg->size) != rc->msg_size)
6710 {
6711 GNUNET_break (0);
6713 finish_cmc_handling (cmc);
6714 return;
6715 }
6716 /* successful reassembly */
6718 "Fragment reassembly complete for message %u\n",
6719 (unsigned int) fb->msg_uuid.uuid);
6720 /* FIXME: check that the resulting msg is NOT a
6721 DV Box or Reliability Box, as that is NOT allowed! */
6722 cmc->mh = msg;
6724 /* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still
6725 en-route and we forget that we finished this reassembly immediately!
6726 -> keep around until timeout?
6727 -> shorten timeout based on ACK? */
6729}
6730
6731
6739static int
6741 const struct TransportReliabilityBoxMessage *rb)
6742{
6743 (void) cls;
6744 const struct GNUNET_MessageHeader *box = (const struct
6745 GNUNET_MessageHeader *) &rb[1];
6746
6748 "check_send_msg with size %u: inner msg type %u and size %u (%lu %lu)\n",
6749 ntohs (rb->header.size),
6750 ntohs (box->type),
6751 ntohs (box->size),
6752 sizeof (struct TransportReliabilityBoxMessage),
6753 sizeof (struct GNUNET_MessageHeader));
6755 return GNUNET_YES;
6756}
6757
6758
6766static void
6768 const struct TransportReliabilityBoxMessage *rb)
6769{
6770 struct CommunicatorMessageContext *cmc = cls;
6771 const struct GNUNET_MessageHeader *inbox =
6772 (const struct GNUNET_MessageHeader *) &rb[1];
6773 struct GNUNET_TIME_Relative rtt;
6774
6776 "Received reliability box from %s with UUID %s of type %u\n",
6777 GNUNET_i2s (&cmc->im.sender),
6779 (unsigned int) ntohs (inbox->type));
6780 rtt = GNUNET_TIME_UNIT_SECONDS; /* FIXME: should base this on "RTT", but we
6781 do not really have an RTT for the
6782 * incoming* queue (should we have
6783 the sender add it to the rb message?) */
6785 &cmc->im.sender,
6786 &rb->ack_uuid,
6787 (0 == ntohl (rb->ack_countdown))
6790 GNUNET_TIME_relative_divide (rtt, 8 /* FIXME: magic constant */)));
6791 /* continue with inner message */
6792 /* FIXME: check that inbox is NOT a DV Box, fragment or another
6793 reliability box (not allowed!) */
6794 cmc->mh = inbox;
6796}
6797
6798
6807static void
6808update_pd_age (struct PerformanceData *pd, unsigned int age)
6809{
6810 unsigned int sage;
6811
6812 if (age == pd->last_age)
6813 return; /* nothing to do */
6814 sage = GNUNET_MAX (pd->last_age, age - 2 * GOODPUT_AGING_SLOTS);
6815 for (unsigned int i = sage; i <= age - GOODPUT_AGING_SLOTS; i++)
6816 {
6817 struct TransmissionHistoryEntry *the = &pd->the[i % GOODPUT_AGING_SLOTS];
6818
6819 the->bytes_sent = 0;
6820 the->bytes_received = 0;
6821 }
6822 pd->last_age = age;
6823}
6824
6825
6834static void
6836 struct GNUNET_TIME_Relative rtt,
6837 uint16_t bytes_transmitted_ok)
6838{
6839 uint64_t nval = rtt.rel_value_us;
6840 uint64_t oval = pd->aged_rtt.rel_value_us;
6841 unsigned int age = get_age ();
6842 struct TransmissionHistoryEntry *the = &pd->the[age % GOODPUT_AGING_SLOTS];
6843
6844 if (oval == GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us)
6845 pd->aged_rtt = rtt;
6846 else
6847 pd->aged_rtt.rel_value_us = (nval + 7 * oval) / 8;
6848 update_pd_age (pd, age);
6849 the->bytes_received += bytes_transmitted_ok;
6850}
6851
6852
6860static void
6862 struct GNUNET_TIME_Relative rtt,
6863 uint16_t bytes_transmitted_ok)
6864{
6865 update_performance_data (&q->pd, rtt, bytes_transmitted_ok);
6866}
6867
6868
6876static void
6878 struct GNUNET_TIME_Relative rtt,
6879 uint16_t bytes_transmitted_ok)
6880{
6881 update_performance_data (&dvh->pd, rtt, bytes_transmitted_ok);
6882}
6883
6884
6892static void
6894{
6895 struct PendingMessage *pos;
6896
6898 "Complete transmission of message %" PRIu64 " %u\n",
6899 pm->logging_uuid,
6900 pm->pmt);
6901 switch (pm->pmt)
6902 {
6903 case PMT_CORE:
6905 /* Full message sent, we are done */
6907 return;
6908
6909 case PMT_FRAGMENT_BOX:
6910 /* Fragment sent over reliable channel */
6911 pos = pm->frag_parent;
6915 "pos frag_off %lu pos bytes_msg %lu pmt %u parent %u\n",
6916 (unsigned long) pos->frag_off,
6917 (unsigned long) pos->bytes_msg,
6918 pos->pmt,
6919 NULL == pos->frag_parent ? 1 : 0);
6920 /* check if subtree is done */
6921 while ((NULL == pos->head_frag) && (pos->frag_off == (pos->bytes_msg
6922 - sizeof(struct
6924 &&
6925 (NULL != pos->frag_parent))
6926 {
6927 pm = pos;
6928 pos = pm->frag_parent;
6929 if ((NULL == pos) && (PMT_DV_BOX == pm->pmt))
6930 {
6932 return;
6933 }
6934 else if (PMT_DV_BOX == pm->pmt)
6935 {