GNUnet 0.21.2
gnunet-service-transport.c
Go to the documentation of this file.
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
75#include "platform.h"
76#include "gnunet_util_lib.h"
80#include "gnunet_nat_service.h"
82#include "gnunet_signatures.h"
83#include "transport.h"
84
88#define RING_BUFFER_SIZE 16
89
93#define MAX_FC_RETRANSMIT_COUNT 1000
94
99#define MAX_CUMMULATIVE_ACKS 64
100
113#define FC_NO_CHANGE_REPLY_PROBABILITY 8
114
119#define IN_PACKET_SIZE_WITHOUT_MTU 128
120
125#define GOODPUT_AGING_SLOTS 4
126
131#define DEFAULT_WINDOW_SIZE (128 * 1024)
132
141#define MAX_INCOMING_REQUEST 16
142
147#define MAX_DV_DISCOVERY_SELECTION 16
148
157#define RECV_WINDOW_SIZE 4
158
166#define MIN_DV_PATH_LENGTH_FOR_INITIATOR 3
167
171#define MAX_DV_HOPS_ALLOWED 16
172
177#define MAX_DV_LEARN_PENDING 64
178
182#define MAX_DV_PATHS_TO_TARGET 3
183
189#define DELAY_WARN_THRESHOLD \
190 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
191
196#define DV_FORWARD_TIMEOUT \
197 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
198
202#define DEFAULT_ACK_WAIT_DURATION \
203 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
204
210#define DV_QUALITY_RTT_THRESHOLD \
211 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
212
217#define DV_PATH_VALIDITY_TIMEOUT \
218 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
219
224#define BACKCHANNEL_INACTIVITY_TIMEOUT \
225 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
226
231#define DV_PATH_DISCOVERY_FREQUENCY \
232 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
233
237#define EPHEMERAL_VALIDITY \
238 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
239
243#define REASSEMBLY_EXPIRATION \
244 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
245
250#define FAST_VALIDATION_CHALLENGE_FREQ \
251 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
252
256#define MAX_VALIDATION_CHALLENGE_FREQ \
257 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
258
264#define ACK_CUMMULATOR_TIMEOUT \
265 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
266
271#define DV_LEARN_BASE_FREQUENCY GNUNET_TIME_UNIT_MINUTES
272
277#define DV_LEARN_QUALITY_THRESHOLD 100
278
282#define MAX_ADDRESS_VALID_UNTIL \
283 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MONTHS, 1)
284
288#define ADDRESS_VALIDATION_LIFETIME \
289 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
290
297#define MIN_DELAY_ADDRESS_VALIDATION GNUNET_TIME_UNIT_MILLISECONDS
298
305#define VALIDATION_RTT_BUFFER_FACTOR 3
306
313#define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
314
320#define QUEUE_LENGTH_LIMIT 32
321
325#define QUEUE_ENTRY_TIMEOUT \
326 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
327
328
330
335{
341};
342
343
348{
353};
354
359{
364
365 /* Followed by *another* message header which is the message to
366 the communicator */
367
368 /* Followed by a 0-terminated name of the communicator */
369};
370
371
376{
381
397
402
408};
409
410
416{
421
427
439
440 /* Followed by a `struct GNUNET_MessageHeader` with a message
441 for the target peer */
442};
443
444
450{
455
463
470};
471
472
477{
485
490};
491
492
501{
506
512
513 /* followed by any number of `struct TransportCummulativeAckPayloadP`
514 messages providing ACKs */
515};
516
517
522{
527
532
537
546
552};
553
554
573{
578
592
597};
598
599
617{
622
627
632
637};
638
639
645{
650
656};
657
658
673{
678
684
694
701
715
721
726
731
732 /* Followed by @e num_hops `struct DVPathEntryP` values,
733 excluding the initiator of the DV trace; the last entry is the
734 current sender; the current peer must not be included. */
735};
736
737
761{
766
770 unsigned int without_fc;
771
779
786
792
799
806
813
814 /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
815 excluding the @e origin and the current peer, the last must be
816 the ultimate target; if @e num_hops is zero, the receiver of this
817 message is the ultimate target. */
818
819 /* Followed by encrypted, variable-size payload, which
820 must begin with a `struct TransportDVBoxPayloadP` */
821
822 /* Followed by the actual message, which itself must not be a
823 a DV_LEARN or DV_BOX message! */
824};
825
826
832{
837
842
847
853};
854
855
861{
866
872
877};
878
879
885{
890
895
901
906
911 struct GNUNET_TIME_AbsoluteNBO origin_time;
912
917 struct GNUNET_TIME_RelativeNBO validity_duration;
918};
919
921{
925 unsigned int address_length;
926
927 /* Followed by @e address_length bytes of the address. */
928};
929
939{
944
953
959
966
976
986
987
992
997
998 /* Followed by @e number_of_addresses struct TransportGlobalNattedAddress. */
999};
1000
1002
1003
1008{
1013
1018
1023
1028
1032 CT_APPLICATION = 4
1034
1035
1041{
1046
1051
1056
1061
1067 RMO_REDUNDANT = 4
1069
1070
1075{
1080
1085
1090
1096};
1097
1098
1104{
1108 uint64_t bytes_sent;
1109
1115};
1116
1117
1122{
1127
1133
1138 unsigned int last_age;
1139};
1140
1141
1145struct TransportClient;
1146
1150struct Neighbour;
1151
1156struct DistanceVector;
1157
1162struct Queue;
1163
1167struct PendingMessage;
1168
1172struct DistanceVectorHop;
1173
1182struct VirtualLink;
1183
1184
1190{
1196
1202
1207
1212
1217
1222 uint16_t total_hops;
1223
1227 unsigned int continue_send;
1228};
1229
1230
1235{
1240
1245};
1246
1247
1252{
1257
1262
1267
1271 uint16_t size;
1272
1279 uint16_t isize;
1280};
1281
1282
1287{
1293
1298
1303
1311 uint8_t *bitfield;
1312
1317
1323
1327 uint16_t msg_size;
1328
1333 uint16_t msg_missing;
1334
1335 /* Followed by @e msg_size bytes of the (partially) defragmented original
1336 * message */
1337
1338 /* Followed by @e bitfield data */
1339};
1340
1341
1351{
1356
1363
1370
1375
1381
1387
1392
1397
1402
1407
1415
1421
1426
1431 unsigned int confirmed;
1432
1436 struct Neighbour *n;
1437
1442
1449
1456
1465
1471
1477
1486
1494
1501
1510
1523
1529
1536
1547
1552 uint32_t fc_seq_gen;
1553
1559 uint32_t last_fc_seq;
1560
1573};
1574
1575
1580{
1586
1592
1599
1606
1613
1620
1627
1634
1639
1645
1651
1656 struct Queue *queue;
1657
1662
1667
1671 unsigned int num_send;
1672};
1673
1674
1679{
1684
1689
1694
1699
1704
1709
1714
1719
1726
1732
1741
1746
1752 unsigned int distance;
1753};
1754
1755
1761{
1766
1771
1776
1781
1787
1793
1798
1803
1808
1813};
1814
1815
1826{
1831
1836
1840 struct Queue *queue;
1841
1846
1850 uint64_t mid;
1851
1856};
1857
1858
1863struct Queue
1864{
1869
1874
1879
1884
1889
1894
1899
1904
1909
1914
1918 const char *address;
1919
1923 unsigned int unlimited_length;
1924
1930
1939
1944
1950
1955 uint64_t mid_gen;
1956
1960 uint32_t qid;
1961
1965 uint32_t mtu;
1966
1971
1976
1980 unsigned int queue_length;
1981
1985 uint64_t q_capacity;
1986
1990 uint32_t priority;
1991
1996
2001
2006 int idle;
2007
2012};
2013
2014
2019{
2024
2030
2036
2041
2046
2052
2058
2064
2070
2076
2081
2086
2091
2096};
2097
2098
2104{
2109
2114
2119
2124};
2125
2126
2130struct PeerRequest
2131{
2136
2141
2146
2153
2158};
2159
2160
2165{
2170
2175
2180
2184 PMT_DV_BOX = 3
2186
2187
2214struct PendingMessage
2215{
2220
2225
2230
2235
2241
2247
2252
2257
2263
2269
2274
2284
2289
2294
2299
2304
2309
2314
2320
2326
2331
2337
2342
2346 uint16_t bytes_msg;
2347
2351 uint16_t frag_off;
2352
2357
2362
2366 uint16_t frag_count;
2367
2372
2373 /* Followed by @e bytes_msg to transmit */
2374};
2375
2376
2381{
2387
2392};
2393
2394
2400{
2405
2410
2417
2422
2428 uint32_t ack_counter;
2429
2433 unsigned int num_acks;
2434};
2435
2436
2441{
2446
2451
2456
2461
2465 const char *address;
2466
2471
2476
2481
2486
2492
2496 uint32_t aid;
2497
2502};
2503
2504
2509{
2514
2519
2524
2529
2534
2535 union
2536 {
2540 struct
2541 {
2547
2553
2557 struct
2558 {
2565
2571
2572
2576 struct
2577 {
2583
2588
2593
2599
2605
2612
2617
2623
2627 struct
2628 {
2636};
2637
2638
2644{
2650
2658
2664
2671 struct GNUNET_TIME_Absolute first_challenge_use;
2672
2679 struct GNUNET_TIME_Absolute last_challenge_use;
2680
2688 struct GNUNET_TIME_Absolute next_challenge;
2689
2698 struct GNUNET_TIME_Relative challenge_backoff;
2699
2704 struct GNUNET_TIME_Relative validation_rtt;
2705
2713 struct GNUNET_CRYPTO_ChallengeNonceP challenge;
2714
2718 struct GNUNET_HashCode hc;
2719
2723 struct GNUNET_SCHEDULER_Task *revalidation_task;
2724
2728 char *address;
2729
2735 struct GNUNET_CONTAINER_HeapNode *hn;
2736
2742
2748 uint32_t last_window_consum_limit;
2749
2754 int awaiting_queue;
2755};
2756
2757
2765{
2770
2775
2780
2785
2791
2796
2802
2808
2814};
2815
2820
2824static unsigned int ring_buffer_head;
2825
2829static unsigned int is_ring_buffer_full;
2830
2835
2839static unsigned int ring_buffer_dv_head;
2840
2844static unsigned int is_ring_buffer_dv_full;
2845
2850
2855
2860
2865
2870
2875
2880
2886
2892
2898
2904
2910
2916
2922
2928
2933
2937static struct LearnLaunchEntry *lle_head = NULL;
2938
2942static struct LearnLaunchEntry *lle_tail = NULL;
2943
2950
2955
2960
2965
2970
2977
2982
2986static unsigned int ir_total;
2987
2991static unsigned long long logging_uuid_gen;
2992
3002
3007static int in_shutdown;
3008
3019static unsigned int
3021{
3022 struct GNUNET_TIME_Absolute now;
3023
3024 now = GNUNET_TIME_absolute_get ();
3025 return now.abs_value_us / GNUNET_TIME_UNIT_MINUTES.rel_value_us / 15;
3026}
3027
3028
3034static void
3036{
3038 GNUNET_assert (ir_total > 0);
3039 ir_total--;
3040 if (NULL != ir->nc)
3042 ir->nc = NULL;
3043 GNUNET_free (ir);
3044}
3045
3046
3052static void
3054{
3055 struct Queue *q = pa->queue;
3056 struct PendingMessage *pm = pa->pm;
3057 struct DistanceVectorHop *dvh = pa->dvh;
3058
3060 "free_pending_acknowledgement\n");
3061 if (NULL != q)
3062 {
3063 GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
3064 pa->queue = NULL;
3065 }
3066 if (NULL != pm)
3067 {
3069 "remove pa from message\n");
3071 "remove pa from message %" PRIu64 "\n",
3072 pm->logging_uuid);
3074 "remove pa from message %u\n",
3075 pm->pmt);
3077 "remove pa from message %s\n",
3079 GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
3080 pa->pm = NULL;
3081 }
3082 if (NULL != dvh)
3083 {
3084 GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
3085 pa->queue = NULL;
3086 }
3089 &pa->ack_uuid.value,
3090 pa));
3091 GNUNET_free (pa);
3092}
3093
3094
3103static void
3105{
3106 struct PendingMessage *frag;
3107
3108 while (NULL != (frag = root->head_frag))
3109 {
3110 struct PendingAcknowledgement *pa;
3111
3112 free_fragment_tree (frag);
3113 while (NULL != (pa = frag->pa_head))
3114 {
3115 GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
3116 pa->pm = NULL;
3117 }
3118 GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
3119 if (NULL != frag->qe)
3120 {
3121 GNUNET_assert (frag == frag->qe->pm);
3122 frag->qe->pm = NULL;
3123 }
3125 "Free frag %p\n",
3126 frag);
3127 GNUNET_free (frag);
3128 }
3129}
3130
3131
3139static void
3141{
3142 struct TransportClient *tc = pm->client;
3143 struct VirtualLink *vl = pm->vl;
3144 struct PendingAcknowledgement *pa;
3145
3147 "Freeing pm %p\n",
3148 pm);
3149 if (NULL != tc)
3150 {
3152 tc->details.core.pending_msg_head,
3153 tc->details.core.pending_msg_tail,
3154 pm);
3155 }
3156 if ((NULL != vl) && (NULL == pm->frag_parent))
3157 {
3159 "Removing pm %" PRIu64 "\n",
3160 pm->logging_uuid);
3162 vl->pending_msg_head,
3163 vl->pending_msg_tail,
3164 pm);
3165 }
3166 else if (NULL != pm->frag_parent && PMT_DV_BOX != pm->pmt)
3167 {
3168 struct PendingMessage *root = pm->frag_parent;
3169
3170 while (NULL != root->frag_parent && PMT_DV_BOX != root->pmt)
3171 root = root->frag_parent;
3172
3173 root->frag_count--;
3174 }
3175 while (NULL != (pa = pm->pa_head))
3176 {
3177 if (NULL == pa)
3179 "free pending pa null\n");
3180 if (NULL == pm->pa_tail)
3182 "free pending pa_tail null\n");
3183 if (NULL == pa->prev_pa)
3185 "free pending pa prev null\n");
3186 if (NULL == pa->next_pa)
3188 "free pending pa next null\n");
3189 GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
3190 pa->pm = NULL;
3191 }
3192
3194 if (NULL != pm->qe)
3195 {
3196 GNUNET_assert (pm == pm->qe->pm);
3197 pm->qe->pm = NULL;
3198 }
3199 if (NULL != pm->bpm)
3200 {
3201 free_fragment_tree (pm->bpm);
3202 if (NULL != pm->bpm->qe)
3203 {
3204 struct QueueEntry *qe = pm->bpm->qe;
3205
3206 qe->pm = NULL;
3207 }
3208 GNUNET_free (pm->bpm);
3209 }
3210
3211 GNUNET_free (pm);
3213 "Freeing pm done\n");
3214}
3215
3216
3222static void
3224{
3225 struct VirtualLink *vl = rc->virtual_link;
3226
3230 rc->msg_uuid.uuid,
3231 rc));
3232 GNUNET_free (rc);
3233}
3234
3235
3241static void
3243{
3244 struct VirtualLink *vl = cls;
3245 struct ReassemblyContext *rc;
3246
3247 vl->reassembly_timeout_task = NULL;
3248 while (NULL != (rc = GNUNET_CONTAINER_heap_peek (vl->reassembly_heap)))
3249 {
3251 .rel_value_us)
3252 {
3254 continue;
3255 }
3260 vl);
3261 return;
3262 }
3263}
3264
3265
3274static int
3275free_reassembly_cb (void *cls, uint32_t key, void *value)
3276{
3277 struct ReassemblyContext *rc = value;
3278
3279 (void) cls;
3280 (void) key;
3282 return GNUNET_OK;
3283}
3284
3285
3291static void
3293{
3294 struct PendingMessage *pm;
3295 struct CoreSentContext *csc;
3296
3298 "free virtual link %p\n",
3299 vl);
3300
3301 if (NULL != vl->reassembly_map)
3302 {
3305 NULL);
3307 vl->reassembly_map = NULL;
3309 vl->reassembly_heap = NULL;
3310 }
3311 if (NULL != vl->reassembly_timeout_task)
3312 {
3315 }
3316 while (NULL != (pm = vl->pending_msg_head))
3320 if (NULL != vl->visibility_task)
3321 {
3323 vl->visibility_task = NULL;
3324 }
3325 if (NULL != vl->fc_retransmit_task)
3326 {
3328 vl->fc_retransmit_task = NULL;
3329 }
3330 while (NULL != (csc = vl->csc_head))
3331 {
3333 GNUNET_assert (vl == csc->vl);
3334 csc->vl = NULL;
3335 }
3336 GNUNET_break (NULL == vl->n);
3337 GNUNET_break (NULL == vl->dv);
3338 GNUNET_free (vl);
3339}
3340
3341
3347static void
3349{
3350 if (NULL != vs->revalidation_task)
3351 {
3352 GNUNET_SCHEDULER_cancel (vs->revalidation_task);
3353 vs->revalidation_task = NULL;
3354 }
3355 /*memcpy (&hkey,
3356 &hc,
3357 sizeof (hkey));*/
3359 "Remove key %s for address %s map size %u contains %u during freeing state\n",
3360 GNUNET_h2s (&vs->hc),
3361 vs->address,
3364 &vs->hc));
3367 GNUNET_YES ==
3370 vs->hn = NULL;
3371 if (NULL != vs->sc)
3372 {
3374 "store cancel\n");
3376 vs->sc = NULL;
3377 }
3378 GNUNET_free (vs->address);
3379 GNUNET_free (vs);
3380}
3381
3382
3389static struct Neighbour *
3391{
3393}
3394
3395
3402static struct VirtualLink *
3404{
3406}
3407
3408
3413{
3420
3425
3430
3435
3440};
3441
3442
3451static void
3453{
3454 struct Neighbour *n = dvh->next_hop;
3455 struct DistanceVector *dv = dvh->dv;
3456 struct PendingAcknowledgement *pa;
3457
3458 while (NULL != (pa = dvh->pa_head))
3459 {
3461 pa->dvh = NULL;
3462 }
3463 GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh);
3465 GNUNET_free (dvh);
3466}
3467
3468
3475static void
3476check_link_down (void *cls);
3477
3478
3484static void
3486{
3488 "Informing CORE clients about disconnect from %s\n",
3489 GNUNET_i2s (pid));
3490 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3491 {
3492 struct GNUNET_MQ_Envelope *env;
3493 struct DisconnectInfoMessage *dim;
3494
3495 if (CT_CORE != tc->type)
3496 continue;
3498 dim->peer = *pid;
3499 GNUNET_MQ_send (tc->mq, env);
3500 }
3501}
3502
3503
3510static void
3512{
3513 struct DistanceVectorHop *dvh;
3514
3515 while (NULL != (dvh = dv->dv_head))
3517 if (NULL == dv->dv_head)
3518 {
3519 struct VirtualLink *vl;
3520
3522 GNUNET_YES ==
3524 if (NULL != (vl = dv->vl))
3525 {
3526 GNUNET_assert (dv == vl->dv);
3527 vl->dv = NULL;
3528 if (NULL == vl->n)
3529 {
3531 free_virtual_link (vl);
3532 }
3533 else
3534 {
3537 }
3538 dv->vl = NULL;
3539 }
3540
3541 if (NULL != dv->timeout_task)
3542 {
3544 dv->timeout_task = NULL;
3545 }
3546 GNUNET_free (dv->km);
3547 GNUNET_free (dv);
3548 }
3549}
3550
3551
3565static void
3567 const struct GNUNET_PeerIdentity *peer,
3568 const char *address,
3570 const struct MonitorEvent *me)
3571{
3572 struct GNUNET_MQ_Envelope *env;
3574 size_t addr_len = strlen (address) + 1;
3575
3577 addr_len,
3579 md->nt = htonl ((uint32_t) nt);
3580 md->peer = *peer;
3581 md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
3582 md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
3583 md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
3584 md->rtt = GNUNET_TIME_relative_hton (me->rtt);
3585 md->cs = htonl ((uint32_t) me->cs);
3586 md->num_msg_pending = htonl (me->num_msg_pending);
3587 md->num_bytes_pending = htonl (me->num_bytes_pending);
3588 memcpy (&md[1], address, addr_len);
3589 GNUNET_MQ_send (tc->mq, env);
3590}
3591
3592
3602static void
3604 const char *address,
3606 const struct MonitorEvent *me)
3607{
3608 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3609 {
3610 if (CT_MONITOR != tc->type)
3611 continue;
3612 if (tc->details.monitor.one_shot)
3613 continue;
3614 if ((GNUNET_NO == GNUNET_is_zero (&tc->details.monitor.peer)) &&
3615 (0 != GNUNET_memcmp (&tc->details.monitor.peer, peer)))
3616 continue;
3618 }
3619}
3620
3621
3631static void *
3633 struct GNUNET_SERVICE_Client *client,
3634 struct GNUNET_MQ_Handle *mq)
3635{
3636 struct TransportClient *tc;
3637
3638 (void) cls;
3639 tc = GNUNET_new (struct TransportClient);
3640 tc->client = client;
3641 tc->mq = mq;
3644 "Client %p of type %u connected\n",
3645 tc,
3646 tc->type);
3647 return tc;
3648}
3649
3650
3651static enum GNUNET_GenericReturnValue
3653 const struct GNUNET_PeerIdentity *pid,
3654 void *value)
3655{
3656 (void) cls;
3657 struct TransportGlobalNattedAddress *tgna = value;
3658
3659 GNUNET_free (tgna);
3660
3661 return GNUNET_OK;
3662}
3663
3664
3670static void
3671free_neighbour (struct Neighbour *neighbour)
3672{
3673 struct DistanceVectorHop *dvh;
3674 struct VirtualLink *vl;
3675
3676 GNUNET_assert (NULL == neighbour->queue_head);
3679 &neighbour->pid,
3680 neighbour));
3682 "Freeing neighbour\n");
3685 NULL);
3687 while (NULL != (dvh = neighbour->dv_head))
3688 {
3689 struct DistanceVector *dv = dvh->dv;
3690
3692 if (NULL == dv->dv_head)
3693 free_dv_route (dv);
3694 }
3695 if (NULL != neighbour->get)
3696 {
3698 neighbour->get = NULL;
3699 }
3700 if (NULL != neighbour->sc)
3701 {
3703 "store cancel\n");
3704 GNUNET_PEERSTORE_store_cancel (neighbour->sc);
3705 neighbour->sc = NULL;
3706 }
3707 if (NULL != (vl = neighbour->vl))
3708 {
3709 GNUNET_assert (neighbour == vl->n);
3710 vl->n = NULL;
3711 if (NULL == vl->dv)
3712 {
3715 }
3716 else
3717 {
3720 }
3721 neighbour->vl = NULL;
3722 }
3723 GNUNET_free (neighbour);
3724}
3725
3726
3733static void
3735 const struct GNUNET_PeerIdentity *pid)
3736{
3737 struct GNUNET_MQ_Envelope *env;
3738 struct ConnectInfoMessage *cim;
3739
3740 GNUNET_assert (CT_CORE == tc->type);
3742 cim->id = *pid;
3743 GNUNET_MQ_send (tc->mq, env);
3744}
3745
3746
3752static void
3754{
3756 "Informing CORE clients about connection to %s\n",
3757 GNUNET_i2s (pid));
3758 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3759 {
3760 if (CT_CORE != tc->type)
3761 continue;
3763 }
3764}
3765
3766
3774static void
3775transmit_on_queue (void *cls);
3776
3777
3781static unsigned int
3783{
3784 for (struct Queue *s = queue_head; NULL != s;
3785 s = s->next_client)
3786 {
3787 if (s->tc->details.communicator.address_prefix !=
3788 queue->tc->details.communicator.address_prefix)
3789 {
3791 "queue address %s qid %u compare with queue: address %s qid %u\n",
3792 queue->address,
3793 queue->qid,
3794 s->address,
3795 s->qid);
3796 if ((s->priority > queue->priority) && (0 < s->q_capacity) &&
3797 (QUEUE_LENGTH_LIMIT > s->queue_length) )
3798 return GNUNET_YES;
3800 "Lower prio\n");
3801 }
3802 }
3803 return GNUNET_NO;
3804}
3805
3806
3814static void
3816 struct Queue *queue,
3818{
3820
3821 if (queue->validated_until.abs_value_us < now.abs_value_us)
3822 return;
3824 queue->tc->details.communicator.
3825 queue_head))
3826 return;
3827
3828 if (queue->tc->details.communicator.total_queue_length >=
3830 {
3832 "Transmission on queue %s (QID %u) throttled due to communicator queue limit\n",
3833 queue->address,
3834 queue->qid);
3836 GST_stats,
3837 "# Transmission throttled due to communicator queue limit",
3838 1,
3839 GNUNET_NO);
3840 queue->idle = GNUNET_NO;
3841 return;
3842 }
3843 if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
3844 {
3846 "Transmission on queue %s (QID %u) throttled due to communicator queue length limit\n",
3847 queue->address,
3848 queue->qid);
3850 "# Transmission throttled due to queue queue limit",
3851 1,
3852 GNUNET_NO);
3853 queue->idle = GNUNET_NO;
3854 return;
3855 }
3856 if (0 == queue->q_capacity)
3857 {
3859 "Transmission on queue %s (QID %u) throttled due to communicator message has capacity %"
3860 PRIu64 ".\n",
3861 queue->address,
3862 queue->qid,
3863 queue->q_capacity);
3865 "# Transmission throttled due to message queue capacity",
3866 1,
3867 GNUNET_NO);
3868 queue->idle = GNUNET_NO;
3869 return;
3870 }
3871 /* queue might indeed be ready, schedule it */
3872 if (NULL != queue->transmit_task)
3873 GNUNET_SCHEDULER_cancel (queue->transmit_task);
3874 queue->transmit_task =
3876 queue);
3878 "Considering transmission on queue `%s' QID %llu to %s\n",
3879 queue->address,
3880 (unsigned long long) queue->qid,
3881 GNUNET_i2s (&queue->neighbour->pid));
3882}
3883
3884
3891static void
3893{
3894 struct VirtualLink *vl = cls;
3895 struct DistanceVector *dv = vl->dv;
3896 struct Neighbour *n = vl->n;
3897 struct GNUNET_TIME_Absolute dvh_timeout;
3898 struct GNUNET_TIME_Absolute q_timeout;
3899
3901 "Checking if link is down\n");
3902 vl->visibility_task = NULL;
3903 dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3904 if (NULL != dv)
3905 {
3906 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
3907 pos = pos->next_dv)
3908 dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout,
3909 pos->path_valid_until);
3910 if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
3911 {
3912 vl->dv->vl = NULL;
3913 vl->dv = NULL;
3914 }
3915 }
3916 q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3917 for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
3918 q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
3919 if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
3920 {
3921 vl->n->vl = NULL;
3922 vl->n = NULL;
3923 }
3924 if ((NULL == vl->n) && (NULL == vl->dv))
3925 {
3927 free_virtual_link (vl);
3928 return;
3929 }
3930 vl->visibility_task =
3931 GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout),
3933 vl);
3934}
3935
3936
3942static void
3944{
3945 struct Neighbour *neighbour = queue->neighbour;
3946 struct TransportClient *tc = queue->tc;
3947 struct MonitorEvent me = { .cs = GNUNET_TRANSPORT_CS_DOWN,
3949 struct QueueEntry *qe;
3950 int maxxed;
3951 struct PendingAcknowledgement *pa;
3952 struct VirtualLink *vl;
3953
3955 "Cleaning up queue %u\n", queue->qid);
3956 if (NULL != queue->mo)
3957 {
3959 queue->mo = NULL;
3960 }
3961 if (NULL != queue->transmit_task)
3962 {
3963 GNUNET_SCHEDULER_cancel (queue->transmit_task);
3964 queue->transmit_task = NULL;
3965 }
3966 while (NULL != (pa = queue->pa_head))
3967 {
3968 GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
3969 pa->queue = NULL;
3970 }
3971
3973 neighbour->queue_head,
3974 neighbour->queue_tail,
3975 queue);
3977 tc->details.communicator.queue_head,
3978 tc->details.communicator.queue_tail,
3979 queue);
3981 tc->details.communicator.total_queue_length);
3983 "Cleaning up queue with length %u\n",
3984 queue->queue_length);
3985 while (NULL != (qe = queue->queue_head))
3986 {
3987 GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe);
3988 queue->queue_length--;
3989 tc->details.communicator.total_queue_length--;
3990 if (NULL != qe->pm)
3991 {
3992 GNUNET_assert (qe == qe->pm->qe);
3993 qe->pm->qe = NULL;
3994 }
3995 GNUNET_free (qe);
3996 }
3997 GNUNET_assert (0 == queue->queue_length);
3998 if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT >
3999 tc->details.communicator.total_queue_length))
4000 {
4001 /* Communicator dropped below threshold, resume all _other_ queues */
4003 GST_stats,
4004 "# Transmission throttled due to communicator queue limit",
4005 -1,
4006 GNUNET_NO);
4007 for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
4008 s = s->next_client)
4010 s,
4012 }
4013 notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
4015
4016 vl = lookup_virtual_link (&neighbour->pid);
4017 if ((NULL != vl) && (neighbour == vl->n))
4018 {
4020 check_link_down (vl);
4021 }
4022 if (NULL == neighbour->queue_head)
4023 {
4024 free_neighbour (neighbour);
4025 }
4026}
4027
4028
4034static void
4036{
4037 struct TransportClient *tc = ale->tc;
4038
4039 GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
4040 tc->details.communicator.addr_tail,
4041 ale);
4042 if (NULL != ale->sc)
4043 {
4045 "store cancel\n");
4047 ale->sc = NULL;
4048 }
4049 if (NULL != ale->st)
4050 {
4052 ale->st = NULL;
4053 }
4054 if (NULL != ale->signed_address)
4056 GNUNET_free (ale);
4057}
4058
4059
4068static int
4070 const struct GNUNET_PeerIdentity *pid,
4071 void *value)
4072{
4073 struct TransportClient *tc = cls;
4074 struct PeerRequest *pr = value;
4075
4076 if (NULL != pr->nc)
4078 pr->nc = NULL;
4080 GNUNET_YES ==
4081 GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
4082 pid,
4083 pr));
4084 GNUNET_free (pr);
4085
4086 return GNUNET_OK;
4087}
4088
4089
4090static void
4091do_shutdown (void *cls);
4092
4101static void
4103 struct GNUNET_SERVICE_Client *client,
4104 void *app_ctx)
4105{
4106 struct TransportClient *tc = app_ctx;
4107
4108 (void) cls;
4109 (void) client;
4111 switch (tc->type)
4112 {
4113 case CT_NONE:
4115 "Unknown Client %p disconnected, cleaning up.\n",
4116 tc);
4117 break;
4118
4119 case CT_CORE: {
4121 "CORE Client %p disconnected, cleaning up.\n",
4122 tc);
4123
4124 struct PendingMessage *pm;
4125
4126 while (NULL != (pm = tc->details.core.pending_msg_head))
4127 {
4129 tc->details.core.pending_msg_head,
4130 tc->details.core.pending_msg_tail,
4131 pm);
4132 pm->client = NULL;
4133 }
4134 }
4135 break;
4136
4137 case CT_MONITOR:
4139 "MONITOR Client %p disconnected, cleaning up.\n",
4140 tc);
4141
4142 break;
4143
4144 case CT_COMMUNICATOR: {
4146 "COMMUNICATOR Client %p disconnected, cleaning up.\n",
4147 tc);
4148
4149 struct Queue *q;
4150 struct AddressListEntry *ale;
4151
4152 if (NULL != tc->details.communicator.free_queue_entry_task)
4154 tc->details.communicator.free_queue_entry_task);
4155 while (NULL != (q = tc->details.communicator.queue_head))
4156 free_queue (q);
4157 while (NULL != (ale = tc->details.communicator.addr_head))
4159 GNUNET_free (tc->details.communicator.address_prefix);
4160 }
4161 break;
4162
4163 case CT_APPLICATION:
4165 "APPLICATION Client %p disconnected, cleaning up.\n",
4166 tc);
4167
4168 GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
4170 tc);
4171 GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests);
4172 break;
4173 }
4174 GNUNET_free (tc);
4175 if ((GNUNET_YES == in_shutdown) && (NULL == clients_head))
4176 {
4178 "Our last client disconnected\n");
4179 do_shutdown (cls);
4180 }
4181}
4182
4183
4193static int
4195 const struct GNUNET_PeerIdentity *pid,
4196 void *value)
4197{
4198 struct TransportClient *tc = cls;
4199 struct VirtualLink *vl = value;
4200
4201 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
4202 return GNUNET_OK;
4203
4205 "Telling new CORE client about existing connection to %s\n",
4206 GNUNET_i2s (pid));
4208 return GNUNET_OK;
4209}
4210
4211
4217static void
4219 unsigned
4220 int free_cmc);
4221
4222static enum GNUNET_GenericReturnValue
4224 const struct GNUNET_PeerIdentity *pid,
4225 void *value)
4226{
4227 struct VirtualLink *vl = value;
4228 struct CommunicatorMessageContext *cmc;
4229
4230 /* resume communicators */
4231 while (NULL != (cmc = vl->cmc_tail))
4232 {
4234 if (GNUNET_NO == cmc->continue_send)
4236 }
4237 return GNUNET_OK;
4238}
4239
4240
4249static void
4250handle_client_start (void *cls, const struct StartMessage *start)
4251{
4252 struct TransportClient *tc = cls;
4253 uint32_t options;
4254
4255 options = ntohl (start->options);
4256 if ((0 != (1 & options)) &&
4257 (0 != GNUNET_memcmp (&start->self, &GST_my_identity)))
4258 {
4259 /* client thinks this is a different peer, reject */
4260 GNUNET_break (0);
4262 return;
4263 }
4264 if (CT_NONE != tc->type)
4265 {
4266 GNUNET_break (0);
4268 return;
4269 }
4270 tc->type = CT_CORE;
4272 "New CORE client with PID %s registered\n",
4273 GNUNET_i2s (&start->self));
4276 tc);
4279 NULL);
4281}
4282
4283
4290static int
4291check_client_send (void *cls, const struct OutboundMessage *obm)
4292{
4293 struct TransportClient *tc = cls;
4294 uint16_t size;
4295 const struct GNUNET_MessageHeader *obmm;
4296
4297 if (CT_CORE != tc->type)
4298 {
4299 GNUNET_break (0);
4300 return GNUNET_SYSERR;
4301 }
4302 size = ntohs (obm->header.size) - sizeof(struct OutboundMessage);
4303 if (size < sizeof(struct GNUNET_MessageHeader))
4304 {
4305 GNUNET_break (0);
4306 return GNUNET_SYSERR;
4307 }
4308 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
4309 if (size != ntohs (obmm->size))
4310 {
4311 GNUNET_break (0);
4312 return GNUNET_SYSERR;
4313 }
4314 return GNUNET_OK;
4315}
4316
4317
4325static void
4327{
4328 struct TransportClient *tc = pm->client;
4329 struct VirtualLink *vl = pm->vl;
4330
4332 "client send response\n");
4333 if (NULL != tc)
4334 {
4335 struct GNUNET_MQ_Envelope *env;
4336 struct SendOkMessage *so_msg;
4337
4339 so_msg->peer = vl->target;
4341 "Confirming transmission of <%" PRIu64 "> to %s\n",
4342 pm->logging_uuid,
4343 GNUNET_i2s (&vl->target));
4344 GNUNET_MQ_send (tc->mq, env);
4345 }
4347}
4348
4349
4359static unsigned int
4362 struct DistanceVectorHop **hops_array,
4363 unsigned int hops_array_length)
4364{
4365 uint64_t choices[hops_array_length];
4366 uint64_t num_dv;
4367 unsigned int dv_count;
4368
4369 /* Pick random vectors, but weighted by distance, giving more weight
4370 to shorter vectors */
4371 num_dv = 0;
4372 dv_count = 0;
4373 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4374 pos = pos->next_dv)
4375 {
4376 if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
4377 (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
4378 .rel_value_us == 0))
4379 continue; /* pos unconfirmed and confirmed required */
4380 num_dv += MAX_DV_HOPS_ALLOWED - pos->distance;
4381 dv_count++;
4382 }
4383 if (0 == dv_count)
4384 return 0;
4385 if (dv_count <= hops_array_length)
4386 {
4387 dv_count = 0;
4388 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4389 pos = pos->next_dv)
4390 hops_array[dv_count++] = pos;
4391 return dv_count;
4392 }
4393 for (unsigned int i = 0; i < hops_array_length; i++)
4394 {
4395 int ok = GNUNET_NO;
4396 while (GNUNET_NO == ok)
4397 {
4398 choices[i] =
4400 ok = GNUNET_YES;
4401 for (unsigned int j = 0; j < i; j++)
4402 if (choices[i] == choices[j])
4403 {
4404 ok = GNUNET_NO;
4405 break;
4406 }
4407 }
4408 }
4409 dv_count = 0;
4410 num_dv = 0;
4411 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4412 pos = pos->next_dv)
4413 {
4414 uint32_t delta = MAX_DV_HOPS_ALLOWED - pos->distance;
4415
4416 if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
4417 (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
4418 .rel_value_us == 0))
4419 continue; /* pos unconfirmed and confirmed required */
4420 for (unsigned int i = 0; i < hops_array_length; i++)
4421 if ((num_dv <= choices[i]) && (num_dv + delta > choices[i]))
4422 hops_array[dv_count++] = pos;
4423 num_dv += delta;
4424 }
4425 return dv_count;
4426}
4427
4428
4435static int
4437 void *cls,
4439{
4440 struct TransportClient *tc = cls;
4441 uint16_t size;
4442
4443 if (CT_NONE != tc->type)
4444 {
4445 GNUNET_break (0);
4446 return GNUNET_SYSERR;
4447 }
4448 tc->type = CT_COMMUNICATOR;
4449 size = ntohs (cam->header.size) - sizeof(*cam);
4450 if (0 == size)
4451 return GNUNET_OK; /* receive-only communicator */
4453 return GNUNET_OK;
4454}
4455
4456
4462static void
4464 unsigned
4465 int free_cmc)
4466{
4467 if (0 != ntohl (cmc->im.fc_on))
4468 {
4469 /* send ACK when done to communicator for flow control! */
4470 struct GNUNET_MQ_Envelope *env;
4472
4474 "Acknowledge message with flow control id %" PRIu64 "\n",
4475 cmc->im.fc_id);
4477 ack->reserved = htonl (0);
4478 ack->fc_id = cmc->im.fc_id;
4479 ack->sender = cmc->im.neighbour_sender;
4480 GNUNET_MQ_send (cmc->tc->mq, env);
4481 }
4482
4484
4485 if (GNUNET_YES == free_cmc)
4486 {
4487 GNUNET_free (cmc);
4488 }
4489}
4490
4491
4492static void
4494{
4496}
4497
4498
4508static void
4509handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
4510{
4511 struct TransportClient *tc = cls;
4512 struct VirtualLink *vl;
4513 uint32_t delta;
4514 struct CommunicatorMessageContext *cmc;
4515
4516 if (CT_CORE != tc->type)
4517 {
4518 GNUNET_break (0);
4520 return;
4521 }
4522 vl = lookup_virtual_link (&rom->peer);
4523 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
4524 {
4526 "# RECV_OK dropped: virtual link unknown",
4527 1,
4528 GNUNET_NO);
4530 return;
4531 }
4532 delta = ntohl (rom->increase_window_delta);
4533 vl->core_recv_window += delta;
4535 "CORE ack receiving message, increased CORE recv window to %d\n",
4536 vl->core_recv_window);
4538 if (vl->core_recv_window <= 0)
4539 return;
4540 /* resume communicators */
4541 while (NULL != (cmc = vl->cmc_tail))
4542 {
4544 if (GNUNET_NO == cmc->continue_send)
4546 }
4547}
4548
4549
4556static void
4558 void *cls,
4560{
4561 struct TransportClient *tc = cls;
4562 uint16_t size;
4563
4564 size = ntohs (cam->header.size) - sizeof(*cam);
4565 if (0 == size)
4566 {
4568 "Receive-only communicator connected\n");
4569 return; /* receive-only communicator */
4570 }
4571 tc->details.communicator.address_prefix =
4572 GNUNET_strdup ((const char *) &cam[1]);
4573 tc->details.communicator.cc =
4576 "Communicator with prefix `%s' connected\n",
4577 tc->details.communicator.address_prefix);
4579}
4580
4581
4589static int
4591 void *cls,
4593{
4594 const struct GNUNET_MessageHeader *inbox;
4595 const char *is;
4596 uint16_t msize;
4597 uint16_t isize;
4598
4599 (void) cls;
4600 msize = ntohs (cb->header.size) - sizeof(*cb);
4601 inbox = (const struct GNUNET_MessageHeader *) &cb[1];
4602 isize = ntohs (inbox->size);
4603 if (isize >= msize)
4604 {
4605 GNUNET_break (0);
4606 return GNUNET_SYSERR;
4607 }
4608 is = (const char *) inbox;
4609 is += isize;
4610 msize -= isize;
4611 GNUNET_assert (0 < msize);
4612 if ('\0' != is[msize - 1])
4613 {
4614 GNUNET_break (0);
4615 return GNUNET_SYSERR;
4616 }
4617 return GNUNET_OK;
4618}
4619
4620
4626static void
4628{
4629 struct EphemeralConfirmationPS ec;
4630
4632 dv->ephemeral_validity =
4635 ec.target = dv->target;
4638 ec.purpose.size = htonl (sizeof(ec));
4640 &ec,
4641 &dv->sender_sig);
4642}
4643
4644
4645static void
4647 struct TransportClient *tc);
4648
4649
4650static void
4652{
4653 struct TransportClient *tc = cls;
4655
4657 "freeing timedout queue entries\n");
4658
4659 tc->details.communicator.free_queue_entry_task = NULL;
4660 for (struct Queue *queue = tc->details.communicator.queue_head; NULL != queue;
4661 queue = queue->next_client)
4662 {
4663 struct QueueEntry *qep = queue->queue_head;
4664
4666 "checking QID %u for timedout queue entries\n",
4667 queue->qid);
4668 while (NULL != qep)
4669 {
4670 struct QueueEntry *pos = qep;
4671
4672 qep = qep->next;
4674 pos->creation_timestamp, now);
4675
4677 "diff to now %s \n",
4680 {
4682 "Freeing timed out QueueEntry with MID %" PRIu64
4683 " and QID %u\n",
4684 pos->mid,
4685 queue->qid);
4686 free_queue_entry (pos, tc);
4687 }
4688 }
4689 }
4690}
4691
4692
4702static void
4704 struct PendingMessage *pm,
4705 const void *payload,
4706 size_t payload_size)
4707{
4708 struct Neighbour *n = queue->neighbour;
4710 struct GNUNET_MQ_Envelope *env;
4711 struct PendingAcknowledgement *pa;
4712
4713 GNUNET_log (
4715 "Queueing %u bytes of payload for transmission <%" PRIu64
4716 "> on queue %llu to %s\n",
4717 (unsigned int) payload_size,
4718 (NULL == pm) ? 0 : pm->logging_uuid,
4719 (unsigned long long) queue->qid,
4720 GNUNET_i2s (&queue->neighbour->pid));
4721 env = GNUNET_MQ_msg_extra (smt,
4722 payload_size,
4724 smt->qid = htonl (queue->qid);
4725 smt->mid = GNUNET_htonll (queue->mid_gen);
4726 smt->receiver = n->pid;
4727 memcpy (&smt[1], payload, payload_size);
4728 {
4729 /* Pass the env to the communicator of queue for transmission. */
4730 struct QueueEntry *qe;
4731
4732 qe = GNUNET_new (struct QueueEntry);
4733 qe->creation_timestamp = GNUNET_TIME_absolute_get ();
4734 qe->mid = queue->mid_gen;
4736 "Create QueueEntry with MID %" PRIu64
4737 " and QID %u and prefix %s\n",
4738 qe->mid,
4739 queue->qid,
4740 queue->tc->details.communicator.address_prefix);
4741 queue->mid_gen++;
4742 qe->queue = queue;
4743 if (NULL != pm)
4744 {
4745 qe->pm = pm;
4746 // TODO Why do we have a retransmission. When we know, make decision if we still want this.
4747 // GNUNET_assert (NULL == pm->qe);
4748 if (NULL != pm->qe)
4749 {
4751 "Retransmitting message <%" PRIu64
4752 "> remove pm from qe with MID: %llu \n",
4753 pm->logging_uuid,
4754 (unsigned long long) pm->qe->mid);
4755 pm->qe->pm = NULL;
4756 }
4757 pm->qe = qe;
4758 }
4759 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
4760 if (0 == queue->q_capacity)
4761 {
4762 // Messages without FC or fragments can get here.
4763 if (NULL != pm)
4764 {
4766 "Message %" PRIu64
4767 " (pm type %u) was not send because queue has no capacity.\n",
4768 pm->logging_uuid,
4769 pm->pmt);
4770 pm->qe = NULL;
4771 }
4772 GNUNET_free (env);
4773 GNUNET_free (qe);
4774 return;
4775 }
4776 GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe);
4777 queue->queue_length++;
4778 queue->tc->details.communicator.total_queue_length++;
4779 if (GNUNET_NO == queue->unlimited_length)
4780 queue->q_capacity--;
4782 "Queue %s with qid %u has capacity %" PRIu64 "\n",
4783 queue->address,
4784 queue->qid,
4785 queue->q_capacity);
4787 queue->tc->details.communicator.total_queue_length)
4788 queue->idle = GNUNET_NO;
4789 if (QUEUE_LENGTH_LIMIT == queue->queue_length)
4790 queue->idle = GNUNET_NO;
4791 if (0 == queue->q_capacity)
4792 queue->idle = GNUNET_NO;
4793
4794 if (GNUNET_NO == queue->idle)
4795 {
4796 struct TransportClient *tc = queue->tc;
4797
4798 if (NULL == tc->details.communicator.free_queue_entry_task)
4799 tc->details.communicator.free_queue_entry_task =
4801 &
4803 tc);
4804 }
4805 if (NULL != pm && NULL != (pa = pm->pa_head))
4806 {
4807 while (pm != pa->pm)
4808 pa = pa->next_pa;
4809 pa->num_send++;
4810 }
4811 // GNUNET_CONTAINER_multiuuidmap_get (pending_acks, &ack[i].ack_uuid.value);
4813 "Sending message MID %" PRIu64
4814 " of type %u (%u) and size %lu with MQ %p queue %s (QID %u) pending %"
4815 PRIu64 "\n",
4816 GNUNET_ntohll (smt->mid),
4817 ntohs (((const struct GNUNET_MessageHeader *) payload)->type),
4818 ntohs (smt->header.size),
4819 (unsigned long) payload_size,
4820 queue->tc->mq,
4821 queue->address,
4822 queue->qid,
4823 (NULL == pm) ? 0 : pm->logging_uuid);
4824 GNUNET_MQ_send (queue->tc->mq, env);
4825 }
4826}
4827
4828
4839static struct GNUNET_TIME_Relative
4841 const struct GNUNET_MessageHeader *hdr,
4843{
4844 struct GNUNET_TIME_Absolute now;
4845 unsigned int candidates;
4846 unsigned int sel1;
4847 unsigned int sel2;
4848 struct GNUNET_TIME_Relative rtt;
4849
4850 /* Pick one or two 'random' queues from n (under constraints of options) */
4851 now = GNUNET_TIME_absolute_get ();
4852 /* FIXME-OPTIMIZE: give queues 'weights' and pick proportional to
4853 weight in the future; weight could be assigned by observed
4854 bandwidth (note: not sure if we should do this for this type
4855 of control traffic though). */
4856 candidates = 0;
4857 for (struct Queue *pos = n->queue_head; NULL != pos;
4858 pos = pos->next_neighbour)
4859 {
4860 if ((0 != (options & RMO_UNCONFIRMED_ALLOWED)) ||
4861 (pos->validated_until.abs_value_us > now.abs_value_us))
4862 candidates++;
4863 }
4864 if (0 == candidates)
4865 {
4866 /* This can happen rarely if the last confirmed queue timed
4867 out just as we were beginning to process this message. */
4869 "Could not route message of type %u to %s: no valid queue\n",
4870 ntohs (hdr->type),
4871 GNUNET_i2s (&n->pid));
4873 "# route selection failed (all no valid queue)",
4874 1,
4875 GNUNET_NO);
4877 }
4878
4881 if (0 == (options & RMO_REDUNDANT))
4882 sel2 = candidates; /* picks none! */
4883 else
4885 candidates = 0;
4886 for (struct Queue *pos = n->queue_head; NULL != pos;
4887 pos = pos->next_neighbour)
4888 {
4889 if ((0 != (options & RMO_UNCONFIRMED_ALLOWED)) ||
4890 (pos->validated_until.abs_value_us > now.abs_value_us))
4891 {
4892 if ((sel1 == candidates) || (sel2 == candidates))
4893 {
4895 "Routing message of type %u to %s using %s (#%u)\n",
4896 ntohs (hdr->type),
4897 GNUNET_i2s (&n->pid),
4898 pos->address,
4899 (sel1 == candidates) ? 1 : 2);
4900 rtt = GNUNET_TIME_relative_min (rtt, pos->pd.aged_rtt);
4901 queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
4902 }
4903 candidates++;
4904 }
4905 }
4906 return rtt;
4907}
4908
4909
4914{
4918 gcry_cipher_hd_t cipher;
4919
4923 struct
4924 {
4929
4933 char aes_key[256 / 8];
4934
4938 char aes_ctr[128 / 8];
4940};
4941
4942
4951static void
4953 const struct GNUNET_ShortHashCode *iv,
4954 struct DVKeyState *key)
4955{
4956 /* must match what we defive from decapsulated key */
4958 GNUNET_CRYPTO_kdf (&key->material,
4959 sizeof(key->material),
4960 "transport-backchannel-key",
4961 strlen ("transport-backchannel-key"),
4962 km,
4963 sizeof(*km),
4964 iv,
4965 sizeof(*iv),
4966 NULL));
4968 "Deriving backchannel key based on KM %s and IV %s\n",
4969 GNUNET_h2s (km),
4970 GNUNET_sh2s (iv));
4971 GNUNET_assert (0 == gcry_cipher_open (&key->cipher,
4972 GCRY_CIPHER_AES256 /* low level: go for speed */,
4973 GCRY_CIPHER_MODE_CTR,
4974 0 /* flags */));
4975 GNUNET_assert (0 == gcry_cipher_setkey (key->cipher,
4976 &key->material.aes_key,
4977 sizeof(key->material.aes_key)));
4978 gcry_cipher_setctr (key->cipher,
4979 &key->material.aes_ctr,
4980 sizeof(key->material.aes_ctr));
4981}
4982
4983
4993static void
4994dv_hmac (const struct DVKeyState *key,
4995 struct GNUNET_HashCode *hmac,
4996 const void *data,
4997 size_t data_size)
4998{
4999 GNUNET_CRYPTO_hmac (&key->material.hmac_key, data, data_size, hmac);
5000}
5001
5002
5012static void
5013dv_encrypt (struct DVKeyState *key, const void *in, void *dst, size_t in_size)
5014{
5015 GNUNET_assert (0 ==
5016 gcry_cipher_encrypt (key->cipher, dst, in_size, in, in_size));
5017}
5018
5019
5030static enum GNUNET_GenericReturnValue
5032 void *out,
5033 const void *ciph,
5034 size_t out_size)
5035{
5036 return (0 ==
5037 gcry_cipher_decrypt (key->cipher,
5038 out, out_size,
5039 ciph, out_size)) ? GNUNET_OK : GNUNET_SYSERR;
5040}
5041
5042
5048static void
5050{
5051 gcry_cipher_close (key->cipher);
5052 GNUNET_CRYPTO_zero_keys (&key->material, sizeof(key->material));
5053}
5054
5055
5066typedef void (*DVMessageHandler) (void *cls,
5067 struct Neighbour *next_hop,
5068 const struct GNUNET_MessageHeader *hdr,
5070
5085static struct GNUNET_TIME_Relative
5087 unsigned int num_dvhs,
5088 struct DistanceVectorHop **dvhs,
5089 const struct GNUNET_MessageHeader *hdr,
5090 DVMessageHandler use,
5091 void *use_cls,
5093 enum GNUNET_GenericReturnValue without_fc)
5094{
5095 struct TransportDVBoxMessage box_hdr;
5096 struct TransportDVBoxPayloadP payload_hdr;
5097 uint16_t enc_body_size = ntohs (hdr->size);
5098 char enc[sizeof(struct TransportDVBoxPayloadP) + enc_body_size] GNUNET_ALIGN;
5099 struct DVKeyState *key;
5100 struct GNUNET_TIME_Relative rtt;
5101 struct GNUNET_HashCode km;
5102
5103 key = GNUNET_new (struct DVKeyState);
5104 /* Encrypt payload */
5106 box_hdr.total_hops = htons (0);
5107 box_hdr.without_fc = htons (without_fc);
5108 // update_ephemeral (dv);
5109 if (0 ==
5110 GNUNET_TIME_absolute_get_remaining (dv->ephemeral_validity).rel_value_us)
5111 {
5112 GNUNET_CRYPTO_eddsa_kem_encaps (&dv->target.public_key,
5113 &dv->ephemeral_key,
5114 &km);
5115 dv->km = GNUNET_new (struct GNUNET_HashCode);
5116 GNUNET_memcpy (dv->km, &km, sizeof(struct GNUNET_HashCode));
5117 sign_ephemeral (dv);
5118 }
5119 box_hdr.ephemeral_key = dv->ephemeral_key;
5120 payload_hdr.sender_sig = dv->sender_sig;
5121
5123 &box_hdr.iv,
5124 sizeof(box_hdr.iv));
5125 // We are creating this key, so this must work.
5126 // FIXME: Possibly also add return values here. We are processing
5127 // Input from other peers...
5128 dv_setup_key_state_from_km (dv->km, &box_hdr.iv, key);
5129 payload_hdr.sender = GST_my_identity;
5130 payload_hdr.monotonic_time = GNUNET_TIME_absolute_hton (dv->monotime);
5131 dv_encrypt (key, &payload_hdr, enc, sizeof(payload_hdr));
5132 dv_encrypt (key,
5133 hdr,
5134 &enc[sizeof(struct TransportDVBoxPayloadP)],
5135 enc_body_size);
5136 dv_hmac (key, &box_hdr.hmac, enc, sizeof(enc));
5137 dv_key_clean (key);
5139 /* For each selected path, take the pre-computed header and body
5140 and add the path in the middle of the message; then send it. */
5141 for (unsigned int i = 0; i < num_dvhs; i++)
5142 {
5143 struct DistanceVectorHop *dvh = dvhs[i];
5144 unsigned int num_hops = dvh->distance + 1;
5145 char buf[sizeof(struct TransportDVBoxMessage)
5146 + sizeof(struct GNUNET_PeerIdentity) * num_hops
5147 + sizeof(struct TransportDVBoxPayloadP)
5148 + enc_body_size] GNUNET_ALIGN;
5149 struct GNUNET_PeerIdentity *dhops;
5150
5151 box_hdr.header.size = htons (sizeof(buf));
5152 box_hdr.orig_size = htons (sizeof(buf));
5153 box_hdr.num_hops = htons (num_hops);
5154 memcpy (buf, &box_hdr, sizeof(box_hdr));
5155 dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof(box_hdr)];
5156 memcpy (dhops,
5157 dvh->path,
5158 dvh->distance * sizeof(struct GNUNET_PeerIdentity));
5159 dhops[dvh->distance] = dv->target;
5160 if (GNUNET_EXTRA_LOGGING > 0)
5161 {
5162 char *path;
5163
5165 for (unsigned int j = 0; j < num_hops; j++)
5166 {
5167 char *tmp;
5168
5169 GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[j]));
5170 GNUNET_free (path);
5171 path = tmp;
5172 }
5174 "Routing message of type %u to %s using DV (#%u/%u) via %s\n",
5175 ntohs (hdr->type),
5176 GNUNET_i2s (&dv->target),
5177 i + 1,
5178 num_dvhs,
5179 path);
5180 GNUNET_free (path);
5181 }
5182 rtt = GNUNET_TIME_relative_min (rtt, dvh->pd.aged_rtt);
5183 memcpy (&dhops[num_hops], enc, sizeof(enc));
5184 use (use_cls,
5185 dvh->next_hop,
5186 (const struct GNUNET_MessageHeader *) buf,
5187 options);
5188 GNUNET_free (key);
5189 }
5190 return rtt;
5191}
5192
5193
5203static void
5205 struct Neighbour *next_hop,
5206 const struct GNUNET_MessageHeader *hdr,
5208{
5209 (void) cls;
5210 (void) route_via_neighbour (next_hop, hdr, RMO_UNCONFIRMED_ALLOWED);
5211}
5212
5213
5225static struct GNUNET_TIME_Relative
5227// route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
5228 const struct GNUNET_MessageHeader *hdr,
5230{
5231 // struct VirtualLink *vl;
5232 struct Neighbour *n;
5233 struct DistanceVector *dv;
5234 struct GNUNET_TIME_Relative rtt1;
5235 struct GNUNET_TIME_Relative rtt2;
5236 const struct GNUNET_PeerIdentity *target = &vl->target;
5237
5239 "Trying to route message of type %u to %s without fc\n",
5240 ntohs (hdr->type),
5241 GNUNET_i2s (target));
5242
5243 // TODO Do this elsewhere. vl should be given as parameter to method.
5244 // vl = lookup_virtual_link (target);
5245 GNUNET_assert (NULL != vl && GNUNET_YES == vl->confirmed);
5246 if (NULL == vl)
5248 n = vl->n;
5249 dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
5250 if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
5251 {
5252 /* if confirmed is required, and we do not have anything
5253 confirmed, drop respective options */
5254 if (NULL == n)
5255 n = lookup_neighbour (target);
5256 if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
5258 }
5259 if ((NULL == n) && (NULL == dv))
5260 {
5262 "Cannot route message of type %u to %s: no route\n",
5263 ntohs (hdr->type),
5264 GNUNET_i2s (target));
5266 "# Messages dropped in routing: no acceptable method",
5267 1,
5268 GNUNET_NO);
5270 }
5272 "Routing message of type %u to %s with options %X\n",
5273 ntohs (hdr->type),
5274 GNUNET_i2s (target),
5275 (unsigned int) options);
5276 /* If both dv and n are possible and we must choose:
5277 flip a coin for the choice between the two; for now 50/50 */
5278 if ((NULL != n) && (NULL != dv) && (0 == (options & RMO_REDUNDANT)))
5279 {
5281 n = NULL;
5282 else
5283 dv = NULL;
5284 }
5285 if ((NULL != n) && (NULL != dv))
5286 options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's
5287 enough for redundancy, so clear the flag. */
5290 if (NULL != n)
5291 {
5293 "Try to route message of type %u to %s without fc via neighbour\n",
5294 ntohs (hdr->type),
5295 GNUNET_i2s (target));
5296 rtt1 = route_via_neighbour (n, hdr, options);
5297 }
5298 if (NULL != dv)
5299 {
5300 struct DistanceVectorHop *hops[2];
5301 unsigned int res;
5302
5304 options,
5305 hops,
5306 (0 == (options & RMO_REDUNDANT)) ? 1 : 2);
5307 if (0 == res)
5308 {
5310 "Failed to route message, could not determine DV path\n");
5311 return rtt1;
5312 }
5314 "encapsulate_for_dv 1\n");
5315 rtt2 = encapsulate_for_dv (dv,
5316 res,
5317 hops,
5318 hdr,
5320 NULL,
5322 GNUNET_YES);
5323 }
5324 return GNUNET_TIME_relative_min (rtt1, rtt2);
5325}
5326
5327
5328static void
5329consider_sending_fc (void *cls);
5330
5337static void
5339{
5340 struct VirtualLink *vl = cls;
5341 vl->fc_retransmit_task = NULL;
5342 consider_sending_fc (cls);
5343}
5344
5345
5346static char *
5347get_address_without_port (const char *address);
5348
5349
5351{
5352 size_t off;
5353 char *tgnas;
5354};
5355
5356
5357static enum GNUNET_GenericReturnValue
5359 const struct GNUNET_PeerIdentity *pid,
5360 void *value)
5361{
5362 struct AddGlobalAddressesContext *ctx = cls;
5363 struct TransportGlobalNattedAddress *tgna = value;
5364 char *addr = (char *) &tgna[1];
5365 size_t address_len = strlen (addr);
5366
5368 "sending address %s length %lu\n",
5369 addr,
5370 address_len);
5371 tgna = GNUNET_malloc (sizeof (struct TransportGlobalNattedAddress) + address_len);
5372 tgna->address_length = htonl (address_len);
5373 GNUNET_memcpy (&tgna[1], addr, address_len);
5374 GNUNET_memcpy (&(ctx->tgnas[ctx->off]), tgna, sizeof (struct TransportGlobalNattedAddress) + address_len);
5375 GNUNET_free (tgna);
5376 ctx->off += sizeof(struct TransportGlobalNattedAddress) + address_len;
5377
5378 return GNUNET_OK;
5379}
5380
5381
5388static void
5390{
5391 struct VirtualLink *vl = cls;
5392 struct GNUNET_TIME_Absolute monotime;
5393 struct TransportFlowControlMessage *fc;
5395 struct GNUNET_TIME_Relative rtt;
5396 struct Neighbour *n = vl->n;
5397
5398 if (0 < n->number_of_addresses)
5399 {
5400 size_t addresses_size =
5401 n->number_of_addresses * sizeof (struct TransportGlobalNattedAddress) + n->size_of_global_addresses;
5402 char *tgnas = GNUNET_malloc (addresses_size);
5404 ctx.off = 0;
5405 ctx.tgnas = tgnas;
5406
5407 fc = GNUNET_malloc (sizeof (struct TransportFlowControlMessage) + addresses_size);
5408 fc->header.size = htons (sizeof(struct TransportFlowControlMessage) + addresses_size);
5409 fc->size_of_addresses = htonl (n->size_of_global_addresses);
5410 fc->number_of_addresses = htonl (n->number_of_addresses);
5411 GNUNET_CONTAINER_multipeermap_iterate (n->natted_addresses,
5413 &ctx);
5414 GNUNET_memcpy (&fc[1], tgnas, addresses_size);
5416 }
5417 else
5418 {
5419 fc = GNUNET_malloc (sizeof (struct TransportFlowControlMessage));
5420 fc->header.size = htons (sizeof(struct TransportFlowControlMessage));
5421 }
5422
5424 /* OPTIMIZE-FC-BDP: decide sane criteria on when to do this, instead of doing
5425 it always! */
5426 /* For example, we should probably ONLY do this if a bit more than
5427 an RTT has passed, or if the window changed "significantly" since
5428 then. See vl->last_fc_rtt! NOTE: to do this properly, we also
5429 need an estimate for the bandwidth-delay-product for the entire
5430 VL, as that determines "significantly". We have the delay, but
5431 the bandwidth statistics need to be added for the VL!*/(void) duration;
5432
5434 "Sending FC seq %u to %s with new window %llu\n",
5435 (unsigned int) vl->fc_seq_gen,
5436 GNUNET_i2s (&vl->target),
5437 (unsigned long long) vl->incoming_fc_window_size);
5439 vl->last_fc_transmission = monotime;
5441 fc->seq = htonl (vl->fc_seq_gen++);
5447 fc->sender_time = GNUNET_TIME_absolute_hton (monotime);
5449 if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us)
5450 {
5453 "FC retransmission to %s failed, will retry in %s\n",
5454 GNUNET_i2s (&vl->target),
5457 }
5458 else
5459 {
5460 /* OPTIMIZE-FC-BDP: rtt is not ideal, we can do better! */
5461 vl->last_fc_rtt = rtt;
5462 }
5463 if (NULL != vl->fc_retransmit_task)
5466 {
5468 vl->fc_retransmit_count = 0;
5469 }
5470 vl->fc_retransmit_task =
5472 vl->fc_retransmit_count++;
5473 GNUNET_free (fc);
5474}
5475
5476
5493static void
5495{
5496 struct Neighbour *n = vl->n;
5497 struct DistanceVector *dv = vl->dv;
5498 struct GNUNET_TIME_Absolute now;
5499 struct VirtualLink *vl_next_hop;
5500 int elig;
5501
5503 "check_vl_transmission to target %s\n",
5504 GNUNET_i2s (&vl->target));
5505 /* Check that we have an eligible pending message!
5506 (cheaper than having #transmit_on_queue() find out!) */
5507 elig = GNUNET_NO;
5508 for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
5509 pm = pm->next_vl)
5510 {
5512 "check_vl_transmission loop\n");
5513 if (NULL != pm->qe)
5514 continue; /* not eligible, is in a queue! */
5515 if (pm->bytes_msg + vl->outbound_fc_window_size_used >
5517 {
5519 "Stalled message %" PRIu64
5520 " transmission on VL %s due to flow control: %llu < %llu\n",
5521 pm->logging_uuid,
5522 GNUNET_i2s (&vl->target),
5523 (unsigned long long) vl->outbound_fc_window_size,
5524 (unsigned long long) (pm->bytes_msg
5527 return; /* We have a message, but flow control says "nope" */
5528 }
5530 "Target window on VL %s not stalled. Scheduling transmission on queue\n",
5531 GNUNET_i2s (&vl->target));
5532 /* Notify queues at direct neighbours that we are interested */
5533 now = GNUNET_TIME_absolute_get ();
5534 if (NULL != n)
5535 {
5536 for (struct Queue *queue = n->queue_head; NULL != queue;
5537 queue = queue->next_neighbour)
5538 {
5539 if ((GNUNET_YES == queue->idle) &&
5540 (queue->validated_until.abs_value_us > now.abs_value_us))
5541 {
5543 "Direct neighbour %s not stalled\n",
5544 GNUNET_i2s (&n->pid));
5546 queue,
5548 elig = GNUNET_YES;
5549 }
5550 else
5552 "Neighbour Queue QID: %u (%u) busy or invalid\n",
5553 queue->qid,
5554 queue->idle);
5555 }
5556 }
5557 /* Notify queues via DV that we are interested */
5558 if (NULL != dv)
5559 {
5560 /* Do DV with lower scheduler priority, which effectively means that
5561 IF a neighbour exists and is available, we prefer it. */
5562 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
5563 pos = pos->next_dv)
5564 {
5565 struct Neighbour *nh = pos->next_hop;
5566
5567
5568 if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
5569 continue; /* skip this one: path not validated */
5570 else
5571 {
5572 vl_next_hop = lookup_virtual_link (&nh->pid);
5573 GNUNET_assert (NULL != vl_next_hop);
5574 if (pm->bytes_msg + vl_next_hop->outbound_fc_window_size_used >
5575 vl_next_hop->outbound_fc_window_size)
5576 {
5578 "Stalled message %" PRIu64
5579 " transmission on next hop %s due to flow control: %llu < %llu\n",
5580 pm->logging_uuid,
5581 GNUNET_i2s (&vl_next_hop->target),
5582 (unsigned long
5583 long) vl_next_hop->outbound_fc_window_size,
5584 (unsigned long long) (pm->bytes_msg
5585 + vl_next_hop->
5586 outbound_fc_window_size_used));
5587 consider_sending_fc (vl_next_hop);
5588 continue; /* We have a message, but flow control says "nope" for the first hop of this path */
5589 }
5590 for (struct Queue *queue = nh->queue_head; NULL != queue;
5591 queue = queue->next_neighbour)
5592 if ((GNUNET_YES == queue->idle) &&
5593 (queue->validated_until.abs_value_us > now.abs_value_us))
5594 {
5596 "Next hop neighbour %s not stalled\n",
5597 GNUNET_i2s (&nh->pid));
5599 queue,
5601 elig = GNUNET_YES;
5602 }
5603 else
5605 "DV Queue QID: %u (%u) busy or invalid\n",
5606 queue->qid,
5607 queue->idle);
5608 }
5609 }
5610 }
5611 if (GNUNET_YES == elig)
5613 "Eligible message %" PRIu64 " of size %u to %s: %llu/%llu\n",
5614 pm->logging_uuid,
5615 pm->bytes_msg,
5616 GNUNET_i2s (&vl->target),
5617 (unsigned long long) vl->outbound_fc_window_size,
5618 (unsigned long long) (pm->bytes_msg
5620 break;
5621 }
5622}
5623
5624
5631static void
5632handle_client_send (void *cls, const struct OutboundMessage *obm)
5633{
5634 struct TransportClient *tc = cls;
5635 struct PendingMessage *pm;
5636 const struct GNUNET_MessageHeader *obmm;
5637 uint32_t bytes_msg;
5638 struct VirtualLink *vl;
5640
5641 GNUNET_assert (CT_CORE == tc->type);
5642 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
5643 bytes_msg = ntohs (obmm->size);
5644 pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
5645 vl = lookup_virtual_link (&obm->peer);
5646 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5647 {
5649 "Don't have %s as a neighbour (anymore).\n",
5650 GNUNET_i2s (&obm->peer));
5651 /* Failure: don't have this peer as a neighbour (anymore).
5652 Might have gone down asynchronously, so this is NOT
5653 a protocol violation by CORE. Still count the event,
5654 as this should be rare. */
5657 "# messages dropped (neighbour unknown)",
5658 1,
5659 GNUNET_NO);
5660 return;
5661 }
5662
5663 pm = GNUNET_malloc (sizeof(struct PendingMessage) + bytes_msg);
5665 "1 created pm %p storing vl %p\n",
5666 pm,
5667 vl);
5668 pm->logging_uuid = logging_uuid_gen++;
5669 pm->prefs = pp;
5670 pm->client = tc;
5671 pm->vl = vl;
5672 pm->bytes_msg = bytes_msg;
5673 memcpy (&pm[1], obmm, bytes_msg);
5675 "Sending message of type %u with %u bytes as <%" PRIu64
5676 "> to %s\n",
5677 ntohs (obmm->type),
5678 bytes_msg,
5679 pm->logging_uuid,
5680 GNUNET_i2s (&obm->peer));
5682 tc->details.core.pending_msg_head,
5683 tc->details.core.pending_msg_tail,
5684 pm);
5686 vl->pending_msg_head,
5687 vl->pending_msg_tail,
5688 pm);
5691}
5692
5693
5703static void
5705 void *cls,
5707{
5708 struct Neighbour *n;
5709 struct VirtualLink *vl;
5710 struct TransportClient *tc = cls;
5711 const struct GNUNET_MessageHeader *inbox =
5712 (const struct GNUNET_MessageHeader *) &cb[1];
5713 uint16_t isize = ntohs (inbox->size);
5714 const char *is = ((const char *) &cb[1]) + isize;
5715 size_t slen = strlen (is) + 1;
5716 char
5717 mbuf[slen + isize
5718 + sizeof(struct
5722
5723 /* 0-termination of 'is' was checked already in
5724 #check_communicator_backchannel() */
5726 "Preparing backchannel transmission to %s:%s of type %u and size %u\n",
5727 GNUNET_i2s (&cb->pid),
5728 is,
5729 ntohs (inbox->type),
5730 ntohs (inbox->size));
5731 /* encapsulate and encrypt message */
5732 be->header.type =
5734 be->header.size = htons (sizeof(mbuf));
5735 memcpy (&be[1], inbox, isize);
5736 memcpy (&mbuf[sizeof(struct TransportBackchannelEncapsulationMessage)
5737 + isize],
5738 is,
5739 strlen (is) + 1);
5740 // route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED);
5741 vl = lookup_virtual_link (&cb->pid);
5742 if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
5743 {
5745 }
5746 else
5747 {
5748 /* Use route via neighbour */
5749 n = lookup_neighbour (&cb->pid);
5750 if (NULL != n)
5752 n,
5753 &be->header,
5754 RMO_NONE);
5755 }
5757}
5758
5759
5767static int
5769 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
5770{
5771 struct TransportClient *tc = cls;
5772
5773 if (CT_COMMUNICATOR != tc->type)
5774 {
5775 GNUNET_break (0);
5776 return GNUNET_SYSERR;
5777 }
5779 return GNUNET_OK;
5780}
5781
5782
5788static void
5789store_pi (void *cls);
5790
5791
5798static void
5799peerstore_store_own_cb (void *cls, int success)
5800{
5801 struct AddressListEntry *ale = cls;
5802
5803 ale->sc = NULL;
5804 if (GNUNET_YES != success)
5806 "Failed to store our own address `%s' in peerstore!\n",
5807 ale->address);
5808 else
5810 "Successfully stored our own address `%s' in peerstore!\n",
5811 ale->address);
5812 /* refresh period is 1/4 of expiration time, that should be plenty
5813 without being excessive. */
5814 ale->st =
5816 4ULL),
5817 &store_pi,
5818 ale);
5819}
5820
5821
5822static void
5823shc_cont (void *cls, int success)
5824{
5825 struct AddressListEntry *ale = cls;
5827
5830 "transport",
5833 ale->signed_address,
5834 ale->signed_address_len,
5835 expiration,
5838 ale);
5839 if (NULL == ale->sc)
5840 {
5842 "Failed to store our address `%s' with peerstore\n",
5843 ale->address);
5845 &store_pi,
5846 ale);
5847 }
5848}
5849
5850
5856static void
5857store_pi (void *cls)
5858{
5859 struct AddressListEntry *ale = cls;
5860 struct GNUNET_MQ_Envelope *env;
5861 const struct GNUNET_MessageHeader *msg;
5862 const char *dash;
5863 char *address_uri;
5865 unsigned int add_success;
5866
5867 dash = strchr (ale->address, '-');
5868 GNUNET_assert (NULL != dash);
5869 dash++;
5870 GNUNET_asprintf (&address_uri,
5871 "%s://%s",
5872 prefix,
5873 dash);
5875 ale->st = NULL;
5877 "Storing our address `%s' in peerstore until %s!\n",
5878 ale->address,
5881 address_uri);
5882 if (GNUNET_OK != add_success)
5883 {
5885 "Storing our address `%s' %s\n",
5886 address_uri,
5887 GNUNET_NO == add_success ? "not done" : "failed");
5888 GNUNET_free (address_uri);
5889 return;
5890 }
5891 else
5892 {
5893
5895 "Storing our address `%s'\n",
5896 address_uri);
5897 }
5898 // FIXME hello_mono_time used here?? What about expiration in ale?
5900 ale->nt,
5903 &ale->signed_address,
5904 &ale->signed_address_len);
5905 GNUNET_free (address_uri);
5911 "store_pi 1\n");
5913 msg,
5914 shc_cont,
5915 ale);
5916 GNUNET_free (env);
5917}
5918
5919
5920static struct AddressListEntry *
5924 const char *address,
5925 uint32_t aid,
5926 size_t slen)
5927{
5928 struct AddressListEntry *ale;
5929
5930 ale = GNUNET_malloc (sizeof(struct AddressListEntry) + slen);
5931 ale->tc = tc;
5932 ale->address = (const char *) &ale[1];
5933 ale->expiration = expiration;
5934 ale->aid = aid;
5935 ale->nt = nt;
5936 memcpy (&ale[1], address, slen);
5937 ale->st = GNUNET_SCHEDULER_add_now (&store_pi, ale);
5938
5939 return ale;
5940}
5941
5942
5949static void
5951 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
5952{
5953 struct TransportClient *tc = cls;
5954 struct AddressListEntry *ale;
5955 size_t slen;
5956
5957 /* 0-termination of &aam[1] was checked in #check_add_address */
5959 "Communicator added address `%s'!\n",
5960 (const char *) &aam[1]);
5961 slen = ntohs (aam->header.size) - sizeof(*aam);
5962 ale = create_address_entry (tc,
5964 (enum GNUNET_NetworkType) ntohl (aam->nt),
5965 (const char *) &aam[1],
5966 aam->aid,
5967 slen);
5968 GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
5969 tc->details.communicator.addr_tail,
5970 ale);
5972}
5973
5974
5981static void
5983 const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
5984{
5985 struct TransportClient *tc = cls;
5986 struct AddressListEntry *alen;
5987
5988 if (CT_COMMUNICATOR != tc->type)
5989 {
5990 GNUNET_break (0);
5992 return;
5993 }
5994 for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
5995 NULL != ale;
5996 ale = alen)
5997 {
5998 alen = ale->next;
5999 if (dam->aid != ale->aid)
6000 continue;
6001 GNUNET_assert (ale->tc == tc);
6003 "Communicator deleted address `%s'!\n",
6004 ale->address);
6007 return;
6008 }
6010 "Communicator removed address we did not even have.\n");
6012 // GNUNET_SERVICE_client_drop (tc->client);
6013}
6014
6015
6023static void
6025
6026
6034static void
6036{
6037 struct CoreSentContext *ctx = cls;
6038 struct VirtualLink *vl = ctx->vl;
6039
6040 if (NULL == vl)
6041 {
6042 /* lost the link in the meantime, ignore */
6043 GNUNET_free (ctx);
6044 return;
6045 }
6048 vl->incoming_fc_window_size_ram -= ctx->size;
6049 vl->incoming_fc_window_size_used += ctx->isize;
6051 GNUNET_free (ctx);
6052}
6053
6054
6055static void
6057 const struct GNUNET_MessageHeader *mh,
6058 struct CommunicatorMessageContext *cmc,
6059 unsigned int free_cmc)
6060{
6061 uint16_t size = ntohs (mh->size);
6062 int have_core;
6063
6064 if (vl->incoming_fc_window_size_ram > UINT_MAX - size)
6065 {
6067 "# CORE messages dropped (FC arithmetic overflow)",
6068 1,
6069 GNUNET_NO);
6071 "CORE messages of type %u with %u bytes dropped (FC arithmetic overflow)\n",
6072 (unsigned int) ntohs (mh->type),
6073 (unsigned int) ntohs (mh->size));
6074 if (GNUNET_YES == free_cmc)
6076 return;
6077 }
6079 {
6081 "# CORE messages dropped (FC window overflow)",
6082 1,
6083 GNUNET_NO);
6085 "CORE messages of type %u with %u bytes dropped (FC window overflow)\n",
6086 (unsigned int) ntohs (mh->type),
6087 (unsigned int) ntohs (mh->size));
6088 if (GNUNET_YES == free_cmc)
6090 return;
6091 }
6092
6093 /* Forward to all CORE clients */
6094 have_core = GNUNET_NO;
6095 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
6096 {
6097 struct GNUNET_MQ_Envelope *env;
6098 struct InboundMessage *im;
6099 struct CoreSentContext *ctx;
6100
6101 if (CT_CORE != tc->type)
6102 continue;
6105 ctx = GNUNET_new (struct CoreSentContext);
6106 ctx->vl = vl;
6107 ctx->size = size;
6108 ctx->isize = (GNUNET_NO == have_core) ? size : 0;
6109 have_core = GNUNET_YES;
6112 im->peer = cmc->im.sender;
6113 memcpy (&im[1], mh, size);
6114 GNUNET_MQ_send (tc->mq, env);
6116 }
6117 if (GNUNET_NO == have_core)
6118 {
6120 "Dropped message to CORE: no CORE client connected!\n");
6121 /* Nevertheless, count window as used, as it is from the
6122 perspective of the other peer! */
6124 /* TODO-M1 */
6126 "Dropped message of type %u with %u bytes to CORE: no CORE client connected!\n",
6127 (unsigned int) ntohs (mh->type),
6128 (unsigned int) ntohs (mh->size));
6129 if (GNUNET_YES == free_cmc)
6131 return;
6132 }
6134 "Delivered message from %s of type %u to CORE recv window %d\n",
6135 GNUNET_i2s (&cmc->im.sender),
6136 ntohs (mh->type),
6138 if (vl->core_recv_window > 0)
6139 {
6140 if (GNUNET_YES == free_cmc)
6142 return;
6143 }
6144 /* Wait with calling #finish_cmc_handling(cmc) until the message
6145 was processed by CORE MQs (for CORE flow control)! */
6146 if (GNUNET_YES == free_cmc)
6148}
6149
6150
6159static void
6161{
6162 struct CommunicatorMessageContext *cmc = cls;
6163 // struct CommunicatorMessageContext *cmc_copy =
6164 // GNUNET_new (struct CommunicatorMessageContext);
6165 struct GNUNET_MessageHeader *mh_copy;
6166 struct RingBufferEntry *rbe;
6167 struct VirtualLink *vl;
6168 uint16_t size = ntohs (mh->size);
6169
6171 "Handling raw message of type %u with %u bytes\n",
6172 (unsigned int) ntohs (mh->type),
6173 (unsigned int) ntohs (mh->size));
6174
6175 if ((size > UINT16_MAX - sizeof(struct InboundMessage)) ||
6176 (size < sizeof(struct GNUNET_MessageHeader)))
6177 {
6178 struct GNUNET_SERVICE_Client *client = cmc->tc->client;
6179
6180 GNUNET_break (0);
6181 finish_cmc_handling (cmc);
6183 return;
6184 }
6185 vl = lookup_virtual_link (&cmc->im.sender);
6186 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
6187 {
6188 /* FIXME: sender is giving us messages for CORE but we don't have
6189 the link up yet! I *suspect* this can happen right now (i.e.
6190 sender has verified us, but we didn't verify sender), but if
6191 we pass this on, CORE would be confused (link down, messages
6192 arrive). We should investigate more if this happens often,
6193 or in a persistent manner, and possibly do "something" about
6194 it. Thus logging as error for now. */
6195
6196 mh_copy = GNUNET_malloc (size);
6197 rbe = GNUNET_new (struct RingBufferEntry);
6198 rbe->cmc = cmc;
6199 /*cmc_copy->tc = cmc->tc;
6200 cmc_copy->im = cmc->im;*/
6201 GNUNET_memcpy (mh_copy, mh, size);
6202
6203 rbe->mh = mh_copy;
6204
6206 {
6207 struct RingBufferEntry *rbe_old = ring_buffer[ring_buffer_head];
6208 GNUNET_free (rbe_old->cmc);
6209 GNUNET_free (rbe_old->mh);
6210 GNUNET_free (rbe_old);
6211 }
6212 ring_buffer[ring_buffer_head] = rbe;// cmc_copy;
6213 // cmc_copy->mh = (const struct GNUNET_MessageHeader *) mh_copy;
6214 cmc->mh = (const struct GNUNET_MessageHeader *) mh_copy;
6216 "Storing message for %s and type %u (%u) in ring buffer head %u is full %u\n",
6217 GNUNET_i2s (&cmc->im.sender),
6218 (unsigned int) ntohs (mh->type),
6219 (unsigned int) ntohs (mh_copy->type),
6223 {
6224 ring_buffer_head = 0;
6226 }
6227 else
6229
6231 "%u items stored in ring buffer\n",
6234
6235 /*GNUNET_break_op (0);
6236 GNUNET_STATISTICS_update (GST_stats,
6237 "# CORE messages dropped (virtual link still down)",
6238 1,
6239 GNUNET_NO);
6240
6241 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6242 "CORE messages of type %u with %u bytes dropped (virtual link still down)\n",
6243 (unsigned int) ntohs (mh->type),
6244 (unsigned int) ntohs (mh->size));
6245 finish_cmc_handling (cmc);*/
6248 // GNUNET_free (cmc);
6249 return;
6250 }
6252}
6253
6254
6262static int
6264{
6265 uint16_t size = ntohs (fb->header.size);
6266 uint16_t bsize = size - sizeof(*fb);
6267
6268 (void) cls;
6269 if (0 == bsize)
6270 {
6271 GNUNET_break_op (0);
6272 return GNUNET_SYSERR;
6273 }
6274 if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
6275 {
6276 GNUNET_break_op (0);
6277 return GNUNET_SYSERR;
6278 }
6279 if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
6280 {
6281 GNUNET_break_op (0);
6282 return GNUNET_SYSERR;
6283 }
6284 return GNUNET_YES;
6285}
6286
6287
6293static void
6295{
6296 struct AcknowledgementCummulator *ac = cls;
6297
6298 ac->task = NULL;
6299 GNUNET_assert (0 == ac->num_acks);
6301 GNUNET_YES ==
6303 GNUNET_free (ac);
6304}
6305
6306
6312static void
6314{
6315 struct Neighbour *n;
6316 struct VirtualLink *vl;
6317 struct AcknowledgementCummulator *ac = cls;
6318 char buf[sizeof(struct TransportReliabilityAckMessage)
6319 + ac->num_acks
6321 struct TransportReliabilityAckMessage *ack =
6322 (struct TransportReliabilityAckMessage *) buf;
6324
6325 ac->task = NULL;
6327 "Sending ACK with %u components to %s\n",
6328 ac->num_acks,
6329 GNUNET_i2s (&ac->target));
6330 GNUNET_assert (0 < ac->num_acks);
6332 ack->header.size =
6333 htons (sizeof(*ack)
6334 + ac->num_acks * sizeof(struct TransportCummulativeAckPayloadP));
6335 ack->ack_counter = htonl (ac->ack_counter += ac->num_acks);
6336 ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
6337 for (unsigned int i = 0; i < ac->num_acks; i++)
6338 {
6339 ap[i].ack_uuid = ac->ack_uuids[i].ack_uuid;
6342 }
6343 /*route_control_message_without_fc (
6344 &ac->target,
6345 &ack->header,
6346 RMO_DV_ALLOWED);*/
6347 vl = lookup_virtual_link (&ac->target);
6348 if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
6349 {
6351 vl,
6352 &ack->header,
6354 }
6355 else
6356 {
6357 /* Use route via neighbour */
6358 n = lookup_neighbour (&ac->target);
6359 if (NULL != n)
6361 n,
6362 &ack->header,
6363 RMO_NONE);
6364 }
6365 ac->num_acks = 0;
6368 ac);
6369}
6370
6371
6380static void
6382 const struct AcknowledgementUUIDP *ack_uuid,
6383 struct GNUNET_TIME_Absolute max_delay)
6384{
6385 struct AcknowledgementCummulator *ac;
6386
6388 "Scheduling ACK %s for transmission to %s\n",
6389 GNUNET_uuid2s (&ack_uuid->value),
6390 GNUNET_i2s (pid));
6392 if (NULL == ac)
6393 {
6395 ac->target = *pid;
6396 ac->min_transmission_time = max_delay;
6400 &ac->target,
6401 ac,
6403 }
6404 else
6405 {
6406 if (MAX_CUMMULATIVE_ACKS == ac->num_acks)
6407 {
6408 /* must run immediately, ack buffer full! */
6410 }
6414 }
6417 ac->ack_uuids[ac->num_acks].ack_uuid = *ack_uuid;
6418 ac->num_acks++;
6421 ac);
6422}
6423
6424
6429{
6434
6439};
6440
6441
6451static int
6452find_by_message_uuid (void *cls, uint32_t key, void *value)
6453{
6454 struct FindByMessageUuidContext *fc = cls;
6455 struct ReassemblyContext *rc = value;
6456
6457 (void) key;
6458 if (0 == GNUNET_memcmp (&fc->message_uuid, &rc->msg_uuid))
6459 {
6460 fc->rc = rc;
6461 return GNUNET_NO;
6462 }
6463 return GNUNET_YES;
6464}
6465
6466
6474static void
6476{
6477 struct CommunicatorMessageContext *cmc = cls;
6478 struct VirtualLink *vl;
6479 struct ReassemblyContext *rc;
6480 const struct GNUNET_MessageHeader *msg;
6481 uint16_t msize;
6482 uint16_t fsize;
6483 uint16_t frag_off;
6484 char *target;
6485 struct GNUNET_TIME_Relative cdelay;
6486 struct FindByMessageUuidContext fc;
6487
6488 vl = lookup_virtual_link (&cmc->im.sender);
6489 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
6490 {
6491 struct GNUNET_SERVICE_Client *client = cmc->tc->client;
6492
6494 "No virtual link for %s to handle fragment\n",
6495 GNUNET_i2s (&cmc->im.sender));
6496 GNUNET_break (0);
6497 finish_cmc_handling (cmc);
6499 return;
6500 }
6501 if (NULL == vl->reassembly_map)
6502 {
6504 vl->reassembly_heap =
6509 vl);
6510 }
6511 msize = ntohs (fb->msg_size);
6512 fc.message_uuid = fb->msg_uuid;
6513 fc.rc = NULL;
6515 fb->msg_uuid.uuid,
6517 &fc);
6518 fsize = ntohs (fb->header.size) - sizeof(*fb);
6519 if (NULL == (rc = fc.rc))
6520 {
6521 rc = GNUNET_malloc (sizeof(*rc) + msize /* reassembly payload buffer */
6522 + (msize + 7) / 8 * sizeof(uint8_t) /* bitfield */);
6523 rc->msg_uuid = fb->msg_uuid;
6524 rc->virtual_link = vl;
6525 rc->msg_size = msize;
6526 rc->reassembly_timeout =
6530 rc,
6534 vl->reassembly_map,
6535 rc->msg_uuid.uuid,
6536 rc,
6538 target = (char *) &rc[1];
6539 rc->bitfield = (uint8_t *) (target + rc->msg_size);
6540 if (fsize != rc->msg_size)
6541 rc->msg_missing = rc->msg_size;
6542 else
6543 rc->msg_missing = 0;
6545 "Received fragment with size %u at offset %u/%u %u bytes missing from %s for NEW message %"
6546 PRIu64 "\n",
6547 fsize,
6548 ntohs (fb->frag_off),
6549 msize,
6550 rc->msg_missing,
6551 GNUNET_i2s (&cmc->im.sender),
6552 fb->msg_uuid.uuid);
6553 }
6554 else
6555 {
6556 target = (char *) &rc[1];
6558 "Received fragment at offset %u/%u from %s for message %u\n",
6559 ntohs (fb->frag_off),
6560 msize,
6561 GNUNET_i2s (&cmc->im.sender),
6562 (unsigned int) fb->msg_uuid.uuid);
6563 }
6564 if (msize != rc->msg_size)
6565 {
6566 GNUNET_break (0);
6567 finish_cmc_handling (cmc);
6568 return;
6569 }
6570
6571 /* reassemble */
6572 if (0 == fsize)
6573 {
6574 GNUNET_break (0);
6575 finish_cmc_handling (cmc);
6576 return;
6577 }
6578 frag_off = ntohs (fb->frag_off);
6579 if (frag_off + fsize > msize)
6580 {
6581 /* Fragment (plus fragment size) exceeds message size! */
6582 GNUNET_break_op (0);
6583 finish_cmc_handling (cmc);
6584 return;
6585 }
6586 memcpy (&target[frag_off], &fb[1], fsize);
6587 /* update bitfield and msg_missing */
6588 for (unsigned int i = frag_off; i < frag_off + fsize; i++)
6589 {
6590 if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
6591 {
6592 rc->bitfield[i / 8] |= (1 << (i % 8));
6593 rc->msg_missing--;
6594 }
6595 }
6596
6597 /* Compute cumulative ACK */
6599 cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->msg_missing / fsize);
6600 if (0 == rc->msg_missing)
6601 cdelay = GNUNET_TIME_UNIT_ZERO;
6602 cummulative_ack (&cmc->im.sender,
6603 &fb->ack_uuid,
6606 /* is reassembly complete? */
6607 if (0 != rc->msg_missing)
6608 {
6609 finish_cmc_handling (cmc);
6610 return;
6611 }
6612 /* reassembly is complete, verify result */
6613 msg = (const struct GNUNET_MessageHeader *) &rc[1];
6614 if (ntohs (msg->size) != rc->msg_size)
6615 {
6616 GNUNET_break (0);
6618 finish_cmc_handling (cmc);
6619 return;
6620 }
6621 /* successful reassembly */
6623 "Fragment reassembly complete for message %u\n",
6624 (unsigned int) fb->msg_uuid.uuid);
6625 /* FIXME: check that the resulting msg is NOT a
6626 DV Box or Reliability Box, as that is NOT allowed! */
6627 cmc->mh = msg;
6629 /* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still
6630 en-route and we forget that we finished this reassembly immediately!
6631 -> keep around until timeout?
6632 -> shorten timeout based on ACK? */
6634}
6635
6636
6644static int
6646 const struct TransportReliabilityBoxMessage *rb)
6647{
6648 (void) cls;
6649 const struct GNUNET_MessageHeader *inbox = (const struct
6650 GNUNET_MessageHeader *) &rb[1];
6651
6653 "check_send_msg with size %u: inner msg type %u and size %u (%lu %lu)\n",
6654 ntohs (rb->header.size),
6655 ntohs (inbox->type),
6656 ntohs (inbox->size),
6657 sizeof (struct TransportReliabilityBoxMessage),
6658 sizeof (struct GNUNET_MessageHeader));
6660 return GNUNET_YES;
6661}
6662
6663
6671static void
6673 const struct TransportReliabilityBoxMessage *rb)
6674{
6675 struct CommunicatorMessageContext *cmc = cls;
6676 const struct GNUNET_MessageHeader *inbox =
6677 (const struct GNUNET_MessageHeader *) &rb[1];
6678 struct GNUNET_TIME_Relative rtt;
6679
6681 "Received reliability box from %s with UUID %s of type %u\n",
6682 GNUNET_i2s (&cmc->im.sender),
6684 (unsigned int) ntohs (inbox->type));
6685 rtt = GNUNET_TIME_UNIT_SECONDS; /* FIXME: should base this on "RTT", but we
6686 do not really have an RTT for the
6687 * incoming* queue (should we have
6688 the sender add it to the rb message?) */
6690 &cmc->im.sender,
6691 &rb->ack_uuid,
6692 (0 == ntohl (rb->ack_countdown))
6695 GNUNET_TIME_relative_divide (rtt, 8 /* FIXME: magic constant */)));
6696 /* continue with inner message */
6697 /* FIXME: check that inbox is NOT a DV Box, fragment or another
6698 reliability box (not allowed!) */
6699 cmc->mh = inbox;
6701}
6702
6703
6712static void
6713update_pd_age (struct PerformanceData *pd, unsigned int age)
6714{
6715 unsigned int sage;
6716
6717 if (age == pd->last_age)
6718 return; /* nothing to do */
6719 sage = GNUNET_MAX (pd->last_age, age - 2 * GOODPUT_AGING_SLOTS);
6720 for (unsigned int i = sage; i <= age - GOODPUT_AGING_SLOTS; i++)
6721 {
6722 struct TransmissionHistoryEntry *the = &pd->the[i % GOODPUT_AGING_SLOTS];
6723
6724 the->bytes_sent = 0;
6725 the->bytes_received = 0;
6726 }
6727 pd->last_age = age;
6728}
6729
6730
6739static void
6741 struct GNUNET_TIME_Relative rtt,
6742 uint16_t bytes_transmitted_ok)
6743{
6744 uint64_t nval = rtt.rel_value_us;
6745 uint64_t oval = pd->aged_rtt.rel_value_us;
6746 unsigned int age = get_age ();
6747 struct TransmissionHistoryEntry *the = &pd->the[age % GOODPUT_AGING_SLOTS];
6748
6749 if (oval == GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us)
6750 pd->aged_rtt = rtt;
6751 else
6752 pd->aged_rtt.rel_value_us = (nval + 7 * oval) / 8;
6753 update_pd_age (pd, age);
6754 the->bytes_received += bytes_transmitted_ok;
6755}
6756
6757
6765static void
6767 struct GNUNET_TIME_Relative rtt,
6768 uint16_t bytes_transmitted_ok)
6769{
6770 update_performance_data (&q->pd, rtt, bytes_transmitted_ok);
6771}
6772
6773
6781static void
6783 struct GNUNET_TIME_Relative rtt,
6784 uint16_t bytes_transmitted_ok)
6785{
6786 update_performance_data (&dvh->pd, rtt, bytes_transmitted_ok);
6787}
6788
6789
6797static void
6799{
6800 struct PendingMessage *pos;
6801
6803 "Complete transmission of message %" PRIu64 " %u\n",
6804 pm->logging_uuid,
6805 pm->pmt);
6806 switch (pm->pmt)
6807 {
6808 case PMT_CORE:
6810 /* Full message sent, we are done */
6812 return;
6813
6814 case PMT_FRAGMENT_BOX:
6815 /* Fragment sent over reliable channel */
6816 pos = pm->frag_parent;
6820 "pos frag_off %lu pos bytes_msg %lu pmt %u parent %u\n",
6821 (unsigned long) pos->frag_off,
6822 (unsigned long) pos->bytes_msg,
6823 pos->pmt,
6824 NULL == pos->frag_parent ? 1 : 0);
6825 /* check if subtree is done */
6826 while ((NULL == pos->head_frag) && (pos->frag_off == (pos->bytes_msg
6827 - sizeof(struct
6829 &&
6830 (NULL != pos->frag_parent))
6831 {
6832 pm = pos;
6833 pos = pm->frag_parent;
6834 if ((NULL == pos) && (PMT_DV_BOX == pm->pmt))
6835 {
6837 return;
6838 }
6839 else if (PMT_DV_BOX == pm->pmt)
6840 {
6842 return;
6843 }
6846 }
6847
6848 /* Was this the last applicable fragment? */
6849 if ((NULL == pos->head_frag) && (NULL == pos->frag_parent || PMT_DV_BOX ==
6850 pos->pmt) &&
6851 (pos->frag_off == pos->bytes_msg))
6853 return;
6854
6855 case PMT_DV_BOX:
6857 "Completed transmission of message %" PRIu64 " (DV Box)\n",
6858 pm->logging_uuid);
6859 if (NULL != pm->frag_parent)
6860 {
6861 pos = pm->frag_parent;
6863 pos->bpm = NULL;
6865 }
6866 else
6868 return;
6869 }
6870}
6871
6872
6880static void
6882 struct GNUNET_TIME_Relative ack_delay)
6883{
6884 struct GNUNET_TIME_Relative delay;
6885
6887 delay = GNUNET_TIME_relative_subtract (delay, ack_delay);
6888 if (NULL != pa->queue && 1 == pa->num_send)
6890 if (NULL != pa->dvh && 1 == pa->num_send)
6891 update_dvh_performance (pa->dvh, delay, pa->message_size);
6892 if (NULL != pa->pm)
6895}
6896
6897
6905static int
6907 const struct TransportReliabilityAckMessage *ra)
6908{
6909 unsigned int n_acks;
6910
6911 (void) cls;
6912 n_acks = (ntohs (ra->header.size) - sizeof(*ra))
6913 / sizeof(struct TransportCummulativeAckPayloadP);
6914 if (0 == n_acks)
6915 {
6916 GNUNET_break_op (0);
6917 return GNUNET_SYSERR;
6918 }
6919 if ((ntohs (ra->header.size) - sizeof(*ra)) !=
6920 n_acks * sizeof(struct