GNUnet 0.21.0
gnunet-service-transport.c
Go to the documentation of this file.
1/*
2 This file is part of GNUnet.
3 Copyright (C) 2010-2016, 2018, 2019 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
75#include "platform.h"
76#include "gnunet_util_lib.h"
81#include "gnunet_signatures.h"
82#include "transport.h"
83
87#define RING_BUFFER_SIZE 16
88
92#define MAX_FC_RETRANSMIT_COUNT 1000
93
98#define MAX_CUMMULATIVE_ACKS 64
99
112#define FC_NO_CHANGE_REPLY_PROBABILITY 8
113
118#define IN_PACKET_SIZE_WITHOUT_MTU 128
119
124#define GOODPUT_AGING_SLOTS 4
125
130#define DEFAULT_WINDOW_SIZE (128 * 1024)
131
140#define MAX_INCOMING_REQUEST 16
141
146#define MAX_DV_DISCOVERY_SELECTION 16
147
156#define RECV_WINDOW_SIZE 4
157
165#define MIN_DV_PATH_LENGTH_FOR_INITIATOR 3
166
170#define MAX_DV_HOPS_ALLOWED 16
171
176#define MAX_DV_LEARN_PENDING 64
177
181#define MAX_DV_PATHS_TO_TARGET 3
182
188#define DELAY_WARN_THRESHOLD \
189 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
190
195#define DV_FORWARD_TIMEOUT \
196 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
197
201#define DEFAULT_ACK_WAIT_DURATION \
202 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
203
209#define DV_QUALITY_RTT_THRESHOLD \
210 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
211
216#define DV_PATH_VALIDITY_TIMEOUT \
217 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
218
223#define BACKCHANNEL_INACTIVITY_TIMEOUT \
224 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
225
230#define DV_PATH_DISCOVERY_FREQUENCY \
231 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
232
236#define EPHEMERAL_VALIDITY \
237 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
238
242#define REASSEMBLY_EXPIRATION \
243 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 4)
244
249#define FAST_VALIDATION_CHALLENGE_FREQ \
250 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
251
255#define MAX_VALIDATION_CHALLENGE_FREQ \
256 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_DAYS, 1)
257
263#define ACK_CUMMULATOR_TIMEOUT \
264 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
265
270#define DV_LEARN_BASE_FREQUENCY GNUNET_TIME_UNIT_MINUTES
271
276#define DV_LEARN_QUALITY_THRESHOLD 100
277
281#define MAX_ADDRESS_VALID_UNTIL \
282 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MONTHS, 1)
283
287#define ADDRESS_VALIDATION_LIFETIME \
288 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 4)
289
296#define MIN_DELAY_ADDRESS_VALIDATION GNUNET_TIME_UNIT_MILLISECONDS
297
304#define VALIDATION_RTT_BUFFER_FACTOR 3
305
312#define COMMUNICATOR_TOTAL_QUEUE_LIMIT 512
313
319#define QUEUE_LENGTH_LIMIT 32
320
324#define QUEUE_ENTRY_TIMEOUT \
325 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
326
327
329
334{
340};
341
342
347{
352};
353
358{
363
364 /* Followed by *another* message header which is the message to
365 the communicator */
366
367 /* Followed by a 0-terminated name of the communicator */
368};
369
370
375{
380
396
401
407};
408
409
415{
420
426
438
439 /* Followed by a `struct GNUNET_MessageHeader` with a message
440 for the target peer */
441};
442
443
449{
454
462
469};
470
471
476{
484
489};
490
491
500{
505
511
512 /* followed by any number of `struct TransportCummulativeAckPayloadP`
513 messages providing ACKs */
514};
515
516
521{
526
531
536
545
551};
552
553
572{
577
591
596};
597
598
616{
621
626
631
636};
637
638
644{
649
655};
656
657
672{
677
683
693
700
714
720
725
730
731 /* Followed by @e num_hops `struct DVPathEntryP` values,
732 excluding the initiator of the DV trace; the last entry is the
733 current sender; the current peer must not be included. */
734};
735
736
760{
765
769 unsigned int without_fc;
770
778
785
791
798
805
812
813 /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
814 excluding the @e origin and the current peer, the last must be
815 the ultimate target; if @e num_hops is zero, the receiver of this
816 message is the ultimate target. */
817
818 /* Followed by encrypted, variable-size payload, which
819 must begin with a `struct TransportDVBoxPayloadP` */
820
821 /* Followed by the actual message, which itself must not be a
822 a DV_LEARN or DV_BOX message! */
823};
824
825
831{
836
841
846
852};
853
854
860{
865
871
876};
877
878
884{
889
894
900
905
910 struct GNUNET_TIME_AbsoluteNBO origin_time;
911
916 struct GNUNET_TIME_RelativeNBO validity_duration;
917};
918
919
929{
934
943
949
956
966
976};
977
978
980
981
986{
991
996
1001
1006
1010 CT_APPLICATION = 4
1012
1013
1019{
1024
1029
1034
1039
1045 RMO_REDUNDANT = 4
1047
1048
1053{
1058
1063
1068
1074};
1075
1076
1082{
1086 uint64_t bytes_sent;
1087
1093};
1094
1095
1100{
1105
1111
1116 unsigned int last_age;
1117};
1118
1119
1123struct TransportClient;
1124
1128struct Neighbour;
1129
1134struct DistanceVector;
1135
1140struct Queue;
1141
1145struct PendingMessage;
1146
1150struct DistanceVectorHop;
1151
1160struct VirtualLink;
1161
1162
1168{
1174
1180
1185
1190
1195
1200 uint16_t total_hops;
1201
1205 unsigned int continue_send;
1206};
1207
1208
1213{
1218
1223};
1224
1225
1230{
1235
1240
1245
1249 uint16_t size;
1250
1257 uint16_t isize;
1258};
1259
1260
1265{
1271
1276
1281
1289 uint8_t *bitfield;
1290
1295
1301
1305 uint16_t msg_size;
1306
1311 uint16_t msg_missing;
1312
1313 /* Followed by @e msg_size bytes of the (partially) defragmented original
1314 * message */
1315
1316 /* Followed by @e bitfield data */
1317};
1318
1319
1329{
1334
1341
1348
1353
1359
1365
1370
1375
1380
1385
1393
1399
1404
1409 unsigned int confirmed;
1410
1414 struct Neighbour *n;
1415
1420
1427
1434
1443
1449
1455
1464
1472
1479
1488
1501
1507
1514
1525
1530 uint32_t fc_seq_gen;
1531
1537 uint32_t last_fc_seq;
1538
1551};
1552
1553
1558{
1564
1570
1577
1584
1591
1598
1605
1612
1617
1623
1629
1634 struct Queue *queue;
1635
1640
1645
1649 unsigned int num_send;
1650};
1651
1652
1657{
1662
1667
1672
1677
1682
1687
1692
1697
1704
1710
1719
1724
1730 unsigned int distance;
1731};
1732
1733
1739{
1744
1749
1754
1759
1765
1771
1776
1781
1786
1791};
1792
1793
1804{
1809
1814
1818 struct Queue *queue;
1819
1824
1828 uint64_t mid;
1829
1834};
1835
1836
1841struct Queue
1842{
1847
1852
1857
1862
1867
1872
1877
1882
1887
1892
1896 const char *address;
1897
1901 unsigned int unlimited_length;
1902
1908
1917
1922
1927 uint64_t mid_gen;
1928
1932 uint32_t qid;
1933
1937 uint32_t mtu;
1938
1943
1948
1952 unsigned int queue_length;
1953
1957 uint64_t q_capacity;
1958
1962 uint32_t priority;
1963
1968
1973
1978 int idle;
1979};
1980
1981
1986{
1991
1997
2003
2008
2013
2019
2025
2031
2037
2043};
2044
2045
2051{
2056
2061
2066
2071};
2072
2073
2077struct PeerRequest
2078{
2083
2088
2093
2100
2105};
2106
2107
2112{
2117
2122
2127
2131 PMT_DV_BOX = 3
2133
2134
2161struct PendingMessage
2162{
2167
2172
2177
2182
2188
2194
2199
2204
2210
2216
2221
2231
2236
2241
2246
2251
2256
2261
2267
2273
2278
2284
2289
2293 uint16_t bytes_msg;
2294
2298 uint16_t frag_off;
2299
2304
2309
2313 uint16_t frag_count;
2314
2319
2320 /* Followed by @e bytes_msg to transmit */
2321};
2322
2323
2328{
2334
2339};
2340
2341
2347{
2352
2357
2364
2369
2375 uint32_t ack_counter;
2376
2380 unsigned int num_acks;
2381};
2382
2383
2388{
2393
2398
2403
2408
2412 const char *address;
2413
2418
2423
2428
2433
2439
2443 uint32_t aid;
2444
2449};
2450
2451
2456{
2461
2466
2471
2476
2481
2482 union
2483 {
2487 struct
2488 {
2494
2500
2504 struct
2505 {
2512
2518
2519
2523 struct
2524 {
2530
2535
2540
2546
2552
2559
2564
2570
2574 struct
2575 {
2583};
2584
2585
2591{
2597
2605
2611
2618 struct GNUNET_TIME_Absolute first_challenge_use;
2619
2626 struct GNUNET_TIME_Absolute last_challenge_use;
2627
2635 struct GNUNET_TIME_Absolute next_challenge;
2636
2645 struct GNUNET_TIME_Relative challenge_backoff;
2646
2651 struct GNUNET_TIME_Relative validation_rtt;
2652
2660 struct GNUNET_CRYPTO_ChallengeNonceP challenge;
2661
2665 struct GNUNET_HashCode hc;
2666
2670 struct GNUNET_SCHEDULER_Task *revalidation_task;
2671
2675 char *address;
2676
2682 struct GNUNET_CONTAINER_HeapNode *hn;
2683
2689
2695 uint32_t last_window_consum_limit;
2696
2701 int awaiting_queue;
2702};
2703
2704
2712{
2717
2722
2727
2732
2738
2743
2749
2755
2761};
2762
2763
2768
2772static unsigned int ring_buffer_head;
2773
2777static unsigned int is_ring_buffer_full;
2778
2783
2787static unsigned int ring_buffer_dv_head;
2788
2792static unsigned int is_ring_buffer_dv_full;
2793
2798
2803
2808
2813
2818
2823
2828
2834
2840
2846
2852
2858
2864
2870
2876
2881
2885static struct LearnLaunchEntry *lle_head = NULL;
2886
2890static struct LearnLaunchEntry *lle_tail = NULL;
2891
2898
2903
2908
2913
2920
2925
2929static unsigned int ir_total;
2930
2934static unsigned long long logging_uuid_gen;
2935
2945
2950static int in_shutdown;
2951
2962static unsigned int
2964{
2965 struct GNUNET_TIME_Absolute now;
2966
2967 now = GNUNET_TIME_absolute_get ();
2968 return now.abs_value_us / GNUNET_TIME_UNIT_MINUTES.rel_value_us / 15;
2969}
2970
2971
2977static void
2979{
2981 GNUNET_assert (ir_total > 0);
2982 ir_total--;
2983 if (NULL != ir->nc)
2985 ir->nc = NULL;
2986 GNUNET_free (ir);
2987}
2988
2989
2995static void
2997{
2998 struct Queue *q = pa->queue;
2999 struct PendingMessage *pm = pa->pm;
3000 struct DistanceVectorHop *dvh = pa->dvh;
3001
3003 "free_pending_acknowledgement\n");
3004 if (NULL != q)
3005 {
3006 GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
3007 pa->queue = NULL;
3008 }
3009 if (NULL != pm)
3010 {
3012 "remove pa from message\n");
3014 "remove pa from message %" PRIu64 "\n",
3015 pm->logging_uuid);
3017 "remove pa from message %u\n",
3018 pm->pmt);
3020 "remove pa from message %s\n",
3022 GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
3023 pa->pm = NULL;
3024 }
3025 if (NULL != dvh)
3026 {
3027 GNUNET_CONTAINER_MDLL_remove (dvh, dvh->pa_head, dvh->pa_tail, pa);
3028 pa->queue = NULL;
3029 }
3032 &pa->ack_uuid.value,
3033 pa));
3034 GNUNET_free (pa);
3035}
3036
3037
3046static void
3048{
3049 struct PendingMessage *frag;
3050
3051 while (NULL != (frag = root->head_frag))
3052 {
3053 struct PendingAcknowledgement *pa;
3054
3055 free_fragment_tree (frag);
3056 while (NULL != (pa = frag->pa_head))
3057 {
3058 GNUNET_CONTAINER_MDLL_remove (pm, frag->pa_head, frag->pa_tail, pa);
3059 pa->pm = NULL;
3060 }
3061 GNUNET_CONTAINER_MDLL_remove (frag, root->head_frag, root->tail_frag, frag);
3062 if (NULL != frag->qe)
3063 {
3064 GNUNET_assert (frag == frag->qe->pm);
3065 frag->qe->pm = NULL;
3066 }
3068 "Free frag %p\n",
3069 frag);
3070 GNUNET_free (frag);
3071 }
3072}
3073
3074
3082static void
3084{
3085 struct TransportClient *tc = pm->client;
3086 struct VirtualLink *vl = pm->vl;
3087 struct PendingAcknowledgement *pa;
3088
3090 "Freeing pm %p\n",
3091 pm);
3092 if (NULL != tc)
3093 {
3095 tc->details.core.pending_msg_head,
3096 tc->details.core.pending_msg_tail,
3097 pm);
3098 }
3099 if ((NULL != vl) && (NULL == pm->frag_parent))
3100 {
3102 "Removing pm %" PRIu64 "\n",
3103 pm->logging_uuid);
3105 vl->pending_msg_head,
3106 vl->pending_msg_tail,
3107 pm);
3108 }
3109 else if (NULL != pm->frag_parent && PMT_DV_BOX != pm->pmt)
3110 {
3111 struct PendingMessage *root = pm->frag_parent;
3112
3113 while (NULL != root->frag_parent && PMT_DV_BOX != root->pmt)
3114 root = root->frag_parent;
3115
3116 root->frag_count--;
3117 }
3118 while (NULL != (pa = pm->pa_head))
3119 {
3120 if (NULL == pa)
3122 "free pending pa null\n");
3123 if (NULL == pm->pa_tail)
3125 "free pending pa_tail null\n");
3126 if (NULL == pa->prev_pa)
3128 "free pending pa prev null\n");
3129 if (NULL == pa->next_pa)
3131 "free pending pa next null\n");
3132 GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
3133 pa->pm = NULL;
3134 }
3135
3137 if (NULL != pm->qe)
3138 {
3139 GNUNET_assert (pm == pm->qe->pm);
3140 pm->qe->pm = NULL;
3141 }
3142 if (NULL != pm->bpm)
3143 {
3144 free_fragment_tree (pm->bpm);
3145 if (NULL != pm->bpm->qe)
3146 {
3147 struct QueueEntry *qe = pm->bpm->qe;
3148
3149 qe->pm = NULL;
3150 }
3151 GNUNET_free (pm->bpm);
3152 }
3153
3154 GNUNET_free (pm);
3156 "Freeing pm done\n");
3157}
3158
3159
3165static void
3167{
3168 struct VirtualLink *vl = rc->virtual_link;
3169
3173 rc->msg_uuid.uuid,
3174 rc));
3175 GNUNET_free (rc);
3176}
3177
3178
3184static void
3186{
3187 struct VirtualLink *vl = cls;
3188 struct ReassemblyContext *rc;
3189
3190 vl->reassembly_timeout_task = NULL;
3191 while (NULL != (rc = GNUNET_CONTAINER_heap_peek (vl->reassembly_heap)))
3192 {
3194 .rel_value_us)
3195 {
3197 continue;
3198 }
3203 vl);
3204 return;
3205 }
3206}
3207
3208
3217static int
3218free_reassembly_cb (void *cls, uint32_t key, void *value)
3219{
3220 struct ReassemblyContext *rc = value;
3221
3222 (void) cls;
3223 (void) key;
3225 return GNUNET_OK;
3226}
3227
3228
3234static void
3236{
3237 struct PendingMessage *pm;
3238 struct CoreSentContext *csc;
3239
3241 "free virtual link %p\n",
3242 vl);
3243
3244 if (NULL != vl->reassembly_map)
3245 {
3248 NULL);
3250 vl->reassembly_map = NULL;
3252 vl->reassembly_heap = NULL;
3253 }
3254 if (NULL != vl->reassembly_timeout_task)
3255 {
3258 }
3259 while (NULL != (pm = vl->pending_msg_head))
3263 if (NULL != vl->visibility_task)
3264 {
3266 vl->visibility_task = NULL;
3267 }
3268 if (NULL != vl->fc_retransmit_task)
3269 {
3271 vl->fc_retransmit_task = NULL;
3272 }
3273 while (NULL != (csc = vl->csc_head))
3274 {
3276 GNUNET_assert (vl == csc->vl);
3277 csc->vl = NULL;
3278 }
3279 GNUNET_break (NULL == vl->n);
3280 GNUNET_break (NULL == vl->dv);
3281 GNUNET_free (vl);
3282}
3283
3284
3290static void
3292{
3293 if (NULL != vs->revalidation_task)
3294 {
3295 GNUNET_SCHEDULER_cancel (vs->revalidation_task);
3296 vs->revalidation_task = NULL;
3297 }
3298 /*memcpy (&hkey,
3299 &hc,
3300 sizeof (hkey));*/
3302 "Remove key %s for address %s map size %u contains %u during freeing state\n",
3303 GNUNET_h2s (&vs->hc),
3304 vs->address,
3307 &vs->hc));
3310 GNUNET_YES ==
3313 vs->hn = NULL;
3314 if (NULL != vs->sc)
3315 {
3317 "store cancel\n");
3319 vs->sc = NULL;
3320 }
3321 GNUNET_free (vs->address);
3322 GNUNET_free (vs);
3323}
3324
3325
3332static struct Neighbour *
3334{
3336}
3337
3338
3345static struct VirtualLink *
3347{
3349}
3350
3351
3356{
3363
3368
3373
3378
3383};
3384
3385
3394static void
3396{
3397 struct Neighbour *n = dvh->next_hop;
3398 struct DistanceVector *dv = dvh->dv;
3399 struct PendingAcknowledgement *pa;
3400
3401 while (NULL != (pa = dvh->pa_head))
3402 {
3404 pa->dvh = NULL;
3405 }
3406 GNUNET_CONTAINER_MDLL_remove (neighbour, n->dv_head, n->dv_tail, dvh);
3408 GNUNET_free (dvh);
3409}
3410
3411
3418static void
3419check_link_down (void *cls);
3420
3421
3427static void
3429{
3431 "Informing CORE clients about disconnect from %s\n",
3432 GNUNET_i2s (pid));
3433 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3434 {
3435 struct GNUNET_MQ_Envelope *env;
3436 struct DisconnectInfoMessage *dim;
3437
3438 if (CT_CORE != tc->type)
3439 continue;
3441 dim->peer = *pid;
3442 GNUNET_MQ_send (tc->mq, env);
3443 }
3444}
3445
3446
3453static void
3455{
3456 struct DistanceVectorHop *dvh;
3457
3458 while (NULL != (dvh = dv->dv_head))
3460 if (NULL == dv->dv_head)
3461 {
3462 struct VirtualLink *vl;
3463
3465 GNUNET_YES ==
3467 if (NULL != (vl = dv->vl))
3468 {
3469 GNUNET_assert (dv == vl->dv);
3470 vl->dv = NULL;
3471 if (NULL == vl->n)
3472 {
3474 free_virtual_link (vl);
3475 }
3476 else
3477 {
3480 }
3481 dv->vl = NULL;
3482 }
3483
3484 if (NULL != dv->timeout_task)
3485 {
3487 dv->timeout_task = NULL;
3488 }
3489 GNUNET_free (dv->km);
3490 GNUNET_free (dv);
3491 }
3492}
3493
3494
3508static void
3510 const struct GNUNET_PeerIdentity *peer,
3511 const char *address,
3513 const struct MonitorEvent *me)
3514{
3515 struct GNUNET_MQ_Envelope *env;
3517 size_t addr_len = strlen (address) + 1;
3518
3520 addr_len,
3522 md->nt = htonl ((uint32_t) nt);
3523 md->peer = *peer;
3524 md->last_validation = GNUNET_TIME_absolute_hton (me->last_validation);
3525 md->valid_until = GNUNET_TIME_absolute_hton (me->valid_until);
3526 md->next_validation = GNUNET_TIME_absolute_hton (me->next_validation);
3527 md->rtt = GNUNET_TIME_relative_hton (me->rtt);
3528 md->cs = htonl ((uint32_t) me->cs);
3529 md->num_msg_pending = htonl (me->num_msg_pending);
3530 md->num_bytes_pending = htonl (me->num_bytes_pending);
3531 memcpy (&md[1], address, addr_len);
3532 GNUNET_MQ_send (tc->mq, env);
3533}
3534
3535
3545static void
3547 const char *address,
3549 const struct MonitorEvent *me)
3550{
3551 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3552 {
3553 if (CT_MONITOR != tc->type)
3554 continue;
3555 if (tc->details.monitor.one_shot)
3556 continue;
3557 if ((GNUNET_NO == GNUNET_is_zero (&tc->details.monitor.peer)) &&
3558 (0 != GNUNET_memcmp (&tc->details.monitor.peer, peer)))
3559 continue;
3561 }
3562}
3563
3564
3574static void *
3576 struct GNUNET_SERVICE_Client *client,
3577 struct GNUNET_MQ_Handle *mq)
3578{
3579 struct TransportClient *tc;
3580
3581 (void) cls;
3582 tc = GNUNET_new (struct TransportClient);
3583 tc->client = client;
3584 tc->mq = mq;
3587 "Client %p of type %u connected\n",
3588 tc,
3589 tc->type);
3590 return tc;
3591}
3592
3593
3599static void
3600free_neighbour (struct Neighbour *neighbour)
3601{
3602 struct DistanceVectorHop *dvh;
3603 struct VirtualLink *vl;
3604
3605 GNUNET_assert (NULL == neighbour->queue_head);
3608 &neighbour->pid,
3609 neighbour));
3611 "Freeing neighbour\n");
3612 while (NULL != (dvh = neighbour->dv_head))
3613 {
3614 struct DistanceVector *dv = dvh->dv;
3615
3617 if (NULL == dv->dv_head)
3618 free_dv_route (dv);
3619 }
3620 if (NULL != neighbour->get)
3621 {
3623 neighbour->get = NULL;
3624 }
3625 if (NULL != neighbour->sc)
3626 {
3628 "store cancel\n");
3629 GNUNET_PEERSTORE_store_cancel (neighbour->sc);
3630 neighbour->sc = NULL;
3631 }
3632 if (NULL != (vl = neighbour->vl))
3633 {
3634 GNUNET_assert (neighbour == vl->n);
3635 vl->n = NULL;
3636 if (NULL == vl->dv)
3637 {
3640 }
3641 else
3642 {
3645 }
3646 neighbour->vl = NULL;
3647 }
3648 GNUNET_free (neighbour);
3649}
3650
3651
3658static void
3660 const struct GNUNET_PeerIdentity *pid)
3661{
3662 struct GNUNET_MQ_Envelope *env;
3663 struct ConnectInfoMessage *cim;
3664
3665 GNUNET_assert (CT_CORE == tc->type);
3667 cim->id = *pid;
3668 GNUNET_MQ_send (tc->mq, env);
3669}
3670
3671
3677static void
3679{
3681 "Informing CORE clients about connection to %s\n",
3682 GNUNET_i2s (pid));
3683 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
3684 {
3685 if (CT_CORE != tc->type)
3686 continue;
3688 }
3689}
3690
3691
3699static void
3700transmit_on_queue (void *cls);
3701
3702
3706static unsigned int
3708{
3709 for (struct Queue *s = queue_head; NULL != s;
3710 s = s->next_client)
3711 {
3712 if (s->tc->details.communicator.address_prefix !=
3713 queue->tc->details.communicator.address_prefix)
3714 {
3716 "queue address %s qid %u compare with queue: address %s qid %u\n",
3717 queue->address,
3718 queue->qid,
3719 s->address,
3720 s->qid);
3721 if ((s->priority > queue->priority) && (0 < s->q_capacity) &&
3722 (QUEUE_LENGTH_LIMIT > s->queue_length) )
3723 return GNUNET_YES;
3725 "Lower prio\n");
3726 }
3727 }
3728 return GNUNET_NO;
3729}
3730
3731
3739static void
3741 struct Queue *queue,
3743{
3745
3746 if (queue->validated_until.abs_value_us < now.abs_value_us)
3747 return;
3749 queue->tc->details.communicator.
3750 queue_head))
3751 return;
3752
3753 if (queue->tc->details.communicator.total_queue_length >=
3755 {
3757 "Transmission on queue %s (QID %u) throttled due to communicator queue limit\n",
3758 queue->address,
3759 queue->qid);
3761 GST_stats,
3762 "# Transmission throttled due to communicator queue limit",
3763 1,
3764 GNUNET_NO);
3765 queue->idle = GNUNET_NO;
3766 return;
3767 }
3768 if (queue->queue_length >= QUEUE_LENGTH_LIMIT)
3769 {
3771 "Transmission on queue %s (QID %u) throttled due to communicator queue length limit\n",
3772 queue->address,
3773 queue->qid);
3775 "# Transmission throttled due to queue queue limit",
3776 1,
3777 GNUNET_NO);
3778 queue->idle = GNUNET_NO;
3779 return;
3780 }
3781 if (0 == queue->q_capacity)
3782 {
3784 "Transmission on queue %s (QID %u) throttled due to communicator message has capacity %"
3785 PRIu64 ".\n",
3786 queue->address,
3787 queue->qid,
3788 queue->q_capacity);
3790 "# Transmission throttled due to message queue capacity",
3791 1,
3792 GNUNET_NO);
3793 queue->idle = GNUNET_NO;
3794 return;
3795 }
3796 /* queue might indeed be ready, schedule it */
3797 if (NULL != queue->transmit_task)
3798 GNUNET_SCHEDULER_cancel (queue->transmit_task);
3799 queue->transmit_task =
3801 queue);
3803 "Considering transmission on queue `%s' QID %llu to %s\n",
3804 queue->address,
3805 (unsigned long long) queue->qid,
3806 GNUNET_i2s (&queue->neighbour->pid));
3807}
3808
3809
3816static void
3818{
3819 struct VirtualLink *vl = cls;
3820 struct DistanceVector *dv = vl->dv;
3821 struct Neighbour *n = vl->n;
3822 struct GNUNET_TIME_Absolute dvh_timeout;
3823 struct GNUNET_TIME_Absolute q_timeout;
3824
3826 "Checking if link is down\n");
3827 vl->visibility_task = NULL;
3828 dvh_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3829 if (NULL != dv)
3830 {
3831 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
3832 pos = pos->next_dv)
3833 dvh_timeout = GNUNET_TIME_absolute_max (dvh_timeout,
3834 pos->path_valid_until);
3835 if (0 == GNUNET_TIME_absolute_get_remaining (dvh_timeout).rel_value_us)
3836 {
3837 vl->dv->vl = NULL;
3838 vl->dv = NULL;
3839 }
3840 }
3841 q_timeout = GNUNET_TIME_UNIT_ZERO_ABS;
3842 for (struct Queue *q = n->queue_head; NULL != q; q = q->next_neighbour)
3843 q_timeout = GNUNET_TIME_absolute_max (q_timeout, q->validated_until);
3844 if (0 == GNUNET_TIME_absolute_get_remaining (q_timeout).rel_value_us)
3845 {
3846 vl->n->vl = NULL;
3847 vl->n = NULL;
3848 }
3849 if ((NULL == vl->n) && (NULL == vl->dv))
3850 {
3852 free_virtual_link (vl);
3853 return;
3854 }
3855 vl->visibility_task =
3856 GNUNET_SCHEDULER_add_at (GNUNET_TIME_absolute_max (q_timeout, dvh_timeout),
3858 vl);
3859}
3860
3861
3867static void
3869{
3870 struct Neighbour *neighbour = queue->neighbour;
3871 struct TransportClient *tc = queue->tc;
3872 struct MonitorEvent me = { .cs = GNUNET_TRANSPORT_CS_DOWN,
3874 struct QueueEntry *qe;
3875 int maxxed;
3876 struct PendingAcknowledgement *pa;
3877 struct VirtualLink *vl;
3878
3880 "Cleaning up queue %u\n", queue->qid);
3881 if (NULL != queue->transmit_task)
3882 {
3883 GNUNET_SCHEDULER_cancel (queue->transmit_task);
3884 queue->transmit_task = NULL;
3885 }
3886 while (NULL != (pa = queue->pa_head))
3887 {
3888 GNUNET_CONTAINER_MDLL_remove (queue, queue->pa_head, queue->pa_tail, pa);
3889 pa->queue = NULL;
3890 }
3891
3893 neighbour->queue_head,
3894 neighbour->queue_tail,
3895 queue);
3897 tc->details.communicator.queue_head,
3898 tc->details.communicator.queue_tail,
3899 queue);
3901 tc->details.communicator.
3902 total_queue_length);
3904 "Cleaning up queue with length %u\n",
3905 queue->queue_length);
3906 while (NULL != (qe = queue->queue_head))
3907 {
3908 GNUNET_CONTAINER_DLL_remove (queue->queue_head, queue->queue_tail, qe);
3909 queue->queue_length--;
3910 tc->details.communicator.total_queue_length--;
3911 if (NULL != qe->pm)
3912 {
3913 GNUNET_assert (qe == qe->pm->qe);
3914 qe->pm->qe = NULL;
3915 }
3916 GNUNET_free (qe);
3917 }
3919 "Cleaning up queue with length %u\n",
3920 queue->queue_length);
3921 GNUNET_assert (0 == queue->queue_length);
3922 if ((maxxed) && (COMMUNICATOR_TOTAL_QUEUE_LIMIT >
3923 tc->details.communicator.total_queue_length))
3924 {
3925 /* Communicator dropped below threshold, resume all _other_ queues */
3927 GST_stats,
3928 "# Transmission throttled due to communicator queue limit",
3929 -1,
3930 GNUNET_NO);
3931 for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
3932 s = s->next_client)
3934 s,
3936 }
3937 notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
3939
3940 vl = lookup_virtual_link (&neighbour->pid);
3941 if ((NULL != vl) && (neighbour == vl->n))
3942 {
3944 check_link_down (vl);
3945 }
3946 if (NULL == neighbour->queue_head)
3947 {
3948 free_neighbour (neighbour);
3949 }
3950}
3951
3952
3958static void
3960{
3961 struct TransportClient *tc = ale->tc;
3962
3963 GNUNET_CONTAINER_DLL_remove (tc->details.communicator.addr_head,
3964 tc->details.communicator.addr_tail,
3965 ale);
3966 if (NULL != ale->sc)
3967 {
3969 "store cancel\n");
3971 ale->sc = NULL;
3972 }
3973 if (NULL != ale->st)
3974 {
3976 ale->st = NULL;
3977 }
3978 if (NULL != ale->signed_address)
3980 GNUNET_free (ale);
3981}
3982
3983
3992static int
3994 const struct GNUNET_PeerIdentity *pid,
3995 void *value)
3996{
3997 struct TransportClient *tc = cls;
3998 struct PeerRequest *pr = value;
3999
4000 if (NULL != pr->nc)
4002 pr->nc = NULL;
4004 GNUNET_YES ==
4005 GNUNET_CONTAINER_multipeermap_remove (tc->details.application.requests,
4006 pid,
4007 pr));
4008 GNUNET_free (pr);
4009
4010 return GNUNET_OK;
4011}
4012
4013
4014static void
4015do_shutdown (void *cls);
4016
4025static void
4027 struct GNUNET_SERVICE_Client *client,
4028 void *app_ctx)
4029{
4030 struct TransportClient *tc = app_ctx;
4031
4032 (void) cls;
4033 (void) client;
4035 switch (tc->type)
4036 {
4037 case CT_NONE:
4039 "Unknown Client %p disconnected, cleaning up.\n",
4040 tc);
4041 break;
4042
4043 case CT_CORE: {
4045 "CORE Client %p disconnected, cleaning up.\n",
4046 tc);
4047
4048 struct PendingMessage *pm;
4049
4050 while (NULL != (pm = tc->details.core.pending_msg_head))
4051 {
4053 tc->details.core.pending_msg_head,
4054 tc->details.core.pending_msg_tail,
4055 pm);
4056 pm->client = NULL;
4057 }
4058 }
4059 break;
4060
4061 case CT_MONITOR:
4063 "MONITOR Client %p disconnected, cleaning up.\n",
4064 tc);
4065
4066 break;
4067
4068 case CT_COMMUNICATOR: {
4070 "COMMUNICATOR Client %p disconnected, cleaning up.\n",
4071 tc);
4072
4073 struct Queue *q;
4074 struct AddressListEntry *ale;
4075
4076 if (NULL != tc->details.communicator.free_queue_entry_task)
4078 tc->details.communicator.free_queue_entry_task);
4079 while (NULL != (q = tc->details.communicator.queue_head))
4080 free_queue (q);
4081 while (NULL != (ale = tc->details.communicator.addr_head))
4083 GNUNET_free (tc->details.communicator.address_prefix);
4084 }
4085 break;
4086
4087 case CT_APPLICATION:
4089 "APPLICATION Client %p disconnected, cleaning up.\n",
4090 tc);
4091
4092 GNUNET_CONTAINER_multipeermap_iterate (tc->details.application.requests,
4094 tc);
4095 GNUNET_CONTAINER_multipeermap_destroy (tc->details.application.requests);
4096 break;
4097 }
4098 GNUNET_free (tc);
4099 if ((GNUNET_YES == in_shutdown) && (NULL == clients_head))
4100 {
4102 "Our last client disconnected\n");
4103 do_shutdown (cls);
4104 }
4105}
4106
4107
4117static int
4119 const struct GNUNET_PeerIdentity *pid,
4120 void *value)
4121{
4122 struct TransportClient *tc = cls;
4123 struct VirtualLink *vl = value;
4124
4125 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
4126 return GNUNET_OK;
4127
4129 "Telling new CORE client about existing connection to %s\n",
4130 GNUNET_i2s (pid));
4132 return GNUNET_OK;
4133}
4134
4135
4141static void
4143 unsigned
4144 int free_cmc);
4145
4146static enum GNUNET_GenericReturnValue
4148 const struct GNUNET_PeerIdentity *pid,
4149 void *value)
4150{
4151 struct VirtualLink *vl = value;
4152 struct CommunicatorMessageContext *cmc;
4153
4154 /* resume communicators */
4155 while (NULL != (cmc = vl->cmc_tail))
4156 {
4158 if (GNUNET_NO == cmc->continue_send)
4160 }
4161 return GNUNET_OK;
4162}
4163
4164
4173static void
4174handle_client_start (void *cls, const struct StartMessage *start)
4175{
4176 struct TransportClient *tc = cls;
4177 uint32_t options;
4178
4179 options = ntohl (start->options);
4180 if ((0 != (1 & options)) &&
4181 (0 != GNUNET_memcmp (&start->self, &GST_my_identity)))
4182 {
4183 /* client thinks this is a different peer, reject */
4184 GNUNET_break (0);
4186 return;
4187 }
4188 if (CT_NONE != tc->type)
4189 {
4190 GNUNET_break (0);
4192 return;
4193 }
4194 tc->type = CT_CORE;
4196 "New CORE client with PID %s registered\n",
4197 GNUNET_i2s (&start->self));
4200 tc);
4203 NULL);
4205}
4206
4207
4214static int
4215check_client_send (void *cls, const struct OutboundMessage *obm)
4216{
4217 struct TransportClient *tc = cls;
4218 uint16_t size;
4219 const struct GNUNET_MessageHeader *obmm;
4220
4221 if (CT_CORE != tc->type)
4222 {
4223 GNUNET_break (0);
4224 return GNUNET_SYSERR;
4225 }
4226 size = ntohs (obm->header.size) - sizeof(struct OutboundMessage);
4227 if (size < sizeof(struct GNUNET_MessageHeader))
4228 {
4229 GNUNET_break (0);
4230 return GNUNET_SYSERR;
4231 }
4232 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
4233 if (size != ntohs (obmm->size))
4234 {
4235 GNUNET_break (0);
4236 return GNUNET_SYSERR;
4237 }
4238 return GNUNET_OK;
4239}
4240
4241
4249static void
4251{
4252 struct TransportClient *tc = pm->client;
4253 struct VirtualLink *vl = pm->vl;
4254
4256 "client send response\n");
4257 if (NULL != tc)
4258 {
4259 struct GNUNET_MQ_Envelope *env;
4260 struct SendOkMessage *so_msg;
4261
4263 so_msg->peer = vl->target;
4265 "Confirming transmission of <%" PRIu64 "> to %s\n",
4266 pm->logging_uuid,
4267 GNUNET_i2s (&vl->target));
4268 GNUNET_MQ_send (tc->mq, env);
4269 }
4271}
4272
4273
4283static unsigned int
4286 struct DistanceVectorHop **hops_array,
4287 unsigned int hops_array_length)
4288{
4289 uint64_t choices[hops_array_length];
4290 uint64_t num_dv;
4291 unsigned int dv_count;
4292
4293 /* Pick random vectors, but weighted by distance, giving more weight
4294 to shorter vectors */
4295 num_dv = 0;
4296 dv_count = 0;
4297 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4298 pos = pos->next_dv)
4299 {
4300 if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
4301 (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
4302 .rel_value_us == 0))
4303 continue; /* pos unconfirmed and confirmed required */
4304 num_dv += MAX_DV_HOPS_ALLOWED - pos->distance;
4305 dv_count++;
4306 }
4307 if (0 == dv_count)
4308 return 0;
4309 if (dv_count <= hops_array_length)
4310 {
4311 dv_count = 0;
4312 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4313 pos = pos->next_dv)
4314 hops_array[dv_count++] = pos;
4315 return dv_count;
4316 }
4317 for (unsigned int i = 0; i < hops_array_length; i++)
4318 {
4319 int ok = GNUNET_NO;
4320 while (GNUNET_NO == ok)
4321 {
4322 choices[i] =
4324 ok = GNUNET_YES;
4325 for (unsigned int j = 0; j < i; j++)
4326 if (choices[i] == choices[j])
4327 {
4328 ok = GNUNET_NO;
4329 break;
4330 }
4331 }
4332 }
4333 dv_count = 0;
4334 num_dv = 0;
4335 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
4336 pos = pos->next_dv)
4337 {
4338 uint32_t delta = MAX_DV_HOPS_ALLOWED - pos->distance;
4339
4340 if ((0 == (options & RMO_UNCONFIRMED_ALLOWED)) &&
4341 (GNUNET_TIME_absolute_get_remaining (pos->path_valid_until)
4342 .rel_value_us == 0))
4343 continue; /* pos unconfirmed and confirmed required */
4344 for (unsigned int i = 0; i < hops_array_length; i++)
4345 if ((num_dv <= choices[i]) && (num_dv + delta > choices[i]))
4346 hops_array[dv_count++] = pos;
4347 num_dv += delta;
4348 }
4349 return dv_count;
4350}
4351
4352
4359static int
4361 void *cls,
4363{
4364 struct TransportClient *tc = cls;
4365 uint16_t size;
4366
4367 if (CT_NONE != tc->type)
4368 {
4369 GNUNET_break (0);
4370 return GNUNET_SYSERR;
4371 }
4372 tc->type = CT_COMMUNICATOR;
4373 size = ntohs (cam->header.size) - sizeof(*cam);
4374 if (0 == size)
4375 return GNUNET_OK; /* receive-only communicator */
4377 return GNUNET_OK;
4378}
4379
4380
4386static void
4388 unsigned
4389 int free_cmc)
4390{
4391 if (0 != ntohl (cmc->im.fc_on))
4392 {
4393 /* send ACK when done to communicator for flow control! */
4394 struct GNUNET_MQ_Envelope *env;
4396
4398 "Acknowledge message with flow control id %" PRIu64 "\n",
4399 cmc->im.fc_id);
4401 ack->reserved = htonl (0);
4402 ack->fc_id = cmc->im.fc_id;
4403 ack->sender = cmc->im.neighbour_sender;
4404 GNUNET_MQ_send (cmc->tc->mq, env);
4405 }
4406
4408
4409 if (GNUNET_YES == free_cmc)
4410 {
4411 GNUNET_free (cmc);
4412 }
4413}
4414
4415
4416static void
4418{
4420}
4421
4422
4432static void
4433handle_client_recv_ok (void *cls, const struct RecvOkMessage *rom)
4434{
4435 struct TransportClient *tc = cls;
4436 struct VirtualLink *vl;
4437 uint32_t delta;
4438 struct CommunicatorMessageContext *cmc;
4439
4440 if (CT_CORE != tc->type)
4441 {
4442 GNUNET_break (0);
4444 return;
4445 }
4446 vl = lookup_virtual_link (&rom->peer);
4447 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
4448 {
4450 "# RECV_OK dropped: virtual link unknown",
4451 1,
4452 GNUNET_NO);
4454 return;
4455 }
4456 delta = ntohl (rom->increase_window_delta);
4457 vl->core_recv_window += delta;
4459 "CORE ack receiving message, increased CORE recv window to %d\n",
4460 vl->core_recv_window);
4462 if (vl->core_recv_window <= 0)
4463 return;
4464 /* resume communicators */
4465 while (NULL != (cmc = vl->cmc_tail))
4466 {
4468 if (GNUNET_NO == cmc->continue_send)
4470 }
4471}
4472
4473
4480static void
4482 void *cls,
4484{
4485 struct TransportClient *tc = cls;
4486 uint16_t size;
4487
4488 size = ntohs (cam->header.size) - sizeof(*cam);
4489 if (0 == size)
4490 {
4492 "Receive-only communicator connected\n");
4493 return; /* receive-only communicator */
4494 }
4495 tc->details.communicator.address_prefix =
4496 GNUNET_strdup ((const char *) &cam[1]);
4497 tc->details.communicator.cc =
4500 "Communicator with prefix `%s' connected\n",
4501 tc->details.communicator.address_prefix);
4503}
4504
4505
4513static int
4515 void *cls,
4517{
4518 const struct GNUNET_MessageHeader *inbox;
4519 const char *is;
4520 uint16_t msize;
4521 uint16_t isize;
4522
4523 (void) cls;
4524 msize = ntohs (cb->header.size) - sizeof(*cb);
4525 inbox = (const struct GNUNET_MessageHeader *) &cb[1];
4526 isize = ntohs (inbox->size);
4527 if (isize >= msize)
4528 {
4529 GNUNET_break (0);
4530 return GNUNET_SYSERR;
4531 }
4532 is = (const char *) inbox;
4533 is += isize;
4534 msize -= isize;
4535 GNUNET_assert (0 < msize);
4536 if ('\0' != is[msize - 1])
4537 {
4538 GNUNET_break (0);
4539 return GNUNET_SYSERR;
4540 }
4541 return GNUNET_OK;
4542}
4543
4544
4550static void
4552{
4553 struct EphemeralConfirmationPS ec;
4554
4556 dv->ephemeral_validity =
4559 ec.target = dv->target;
4562 ec.purpose.size = htonl (sizeof(ec));
4564 &ec,
4565 &dv->sender_sig);
4566}
4567
4568
4569static void
4571 struct TransportClient *tc);
4572
4573
4574static void
4576{
4577 struct TransportClient *tc = cls;
4579
4581 "freeing timedout queue entries\n");
4582
4583 tc->details.communicator.free_queue_entry_task = NULL;
4584 for (struct Queue *queue = tc->details.communicator.queue_head; NULL != queue;
4585 queue = queue->next_client)
4586 {
4587 struct QueueEntry *qep = queue->queue_head;
4588
4590 "checking QID %u for timedout queue entries\n",
4591 queue->qid);
4592 while (NULL != qep)
4593 {
4594 struct QueueEntry *pos = qep;
4595
4596 qep = qep->next;
4598 pos->creation_timestamp, now);
4599
4601 "diff to now %s \n",
4604 {
4606 "Freeing timed out QueueEntry with MID %" PRIu64
4607 " and QID %u\n",
4608 pos->mid,
4609 queue->qid);
4610 free_queue_entry (pos, tc);
4611 }
4612 }
4613 }
4614}
4615
4616
4626static void
4628 struct PendingMessage *pm,
4629 const void *payload,
4630 size_t payload_size)
4631{
4632 struct Neighbour *n = queue->neighbour;
4634 struct GNUNET_MQ_Envelope *env;
4635 struct PendingAcknowledgement *pa;
4636
4637 GNUNET_log (
4639 "Queueing %u bytes of payload for transmission <%" PRIu64
4640 "> on queue %llu to %s\n",
4641 (unsigned int) payload_size,
4642 (NULL == pm) ? 0 : pm->logging_uuid,
4643 (unsigned long long) queue->qid,
4644 GNUNET_i2s (&queue->neighbour->pid));
4645 env = GNUNET_MQ_msg_extra (smt,
4646 payload_size,
4648 smt->qid = htonl (queue->qid);
4649 smt->mid = GNUNET_htonll (queue->mid_gen);
4650 smt->receiver = n->pid;
4651 memcpy (&smt[1], payload, payload_size);
4652 {
4653 /* Pass the env to the communicator of queue for transmission. */
4654 struct QueueEntry *qe;
4655
4656 qe = GNUNET_new (struct QueueEntry);
4657 qe->creation_timestamp = GNUNET_TIME_absolute_get ();
4658 qe->mid = queue->mid_gen;
4660 "Create QueueEntry with MID %" PRIu64
4661 " and QID %u and prefix %s\n",
4662 qe->mid,
4663 queue->qid,
4664 queue->tc->details.communicator.address_prefix);
4665 queue->mid_gen++;
4666 qe->queue = queue;
4667 if (NULL != pm)
4668 {
4669 qe->pm = pm;
4670 // TODO Why do we have a retransmission. When we know, make decision if we still want this.
4671 // GNUNET_assert (NULL == pm->qe);
4672 if (NULL != pm->qe)
4673 {
4675 "Retransmitting message <%" PRIu64
4676 "> remove pm from qe with MID: %llu \n",
4677 pm->logging_uuid,
4678 (unsigned long long) pm->qe->mid);
4679 pm->qe->pm = NULL;
4680 }
4681 pm->qe = qe;
4682 }
4683 GNUNET_assert (CT_COMMUNICATOR == queue->tc->type);
4684 if (0 == queue->q_capacity)
4685 {
4686 // Messages without FC or fragments can get here.
4687 if (NULL != pm)
4688 {
4690 "Message %" PRIu64
4691 " (pm type %u) was not send because queue has no capacity.\n",
4692 pm->logging_uuid,
4693 pm->pmt);
4694 pm->qe = NULL;
4695 }
4696 GNUNET_free (env);
4697 GNUNET_free (qe);
4698 return;
4699 }
4700 GNUNET_CONTAINER_DLL_insert (queue->queue_head, queue->queue_tail, qe);
4701 queue->queue_length++;
4702 queue->tc->details.communicator.total_queue_length++;
4703 if (GNUNET_NO == queue->unlimited_length)
4704 queue->q_capacity--;
4706 "Queue %s with qid %u has capacity %" PRIu64 "\n",
4707 queue->address,
4708 queue->qid,
4709 queue->q_capacity);
4711 queue->tc->details.communicator.total_queue_length)
4712 queue->idle = GNUNET_NO;
4713 if (QUEUE_LENGTH_LIMIT == queue->queue_length)
4714 queue->idle = GNUNET_NO;
4715 if (0 == queue->q_capacity)
4716 queue->idle = GNUNET_NO;
4717
4718 if (GNUNET_NO == queue->idle)
4719 {
4720 struct TransportClient *tc = queue->tc;
4721
4722 if (NULL == tc->details.communicator.free_queue_entry_task)
4723 tc->details.communicator.free_queue_entry_task =
4725 &
4727 tc);
4728 }
4729 if (NULL != pm && NULL != (pa = pm->pa_head))
4730 {
4731 while (pm != pa->pm)
4732 pa = pa->next_pa;
4733 pa->num_send++;
4734 }
4735 // GNUNET_CONTAINER_multiuuidmap_get (pending_acks, &ack[i].ack_uuid.value);
4737 "Sending message MID %" PRIu64
4738 " of type %u (%u) and size %lu with MQ %p queue %s (QID %u) pending %"
4739 PRIu64 "\n",
4740 GNUNET_ntohll (smt->mid),
4741 ntohs (((const struct GNUNET_MessageHeader *) payload)->type),
4742 ntohs (smt->header.size),
4743 (unsigned long) payload_size,
4744 queue->tc->mq,
4745 queue->address,
4746 queue->qid,
4747 (NULL == pm) ? 0 : pm->logging_uuid);
4748 GNUNET_MQ_send (queue->tc->mq, env);
4749 }
4750}
4751
4752
4763static struct GNUNET_TIME_Relative
4765 const struct GNUNET_MessageHeader *hdr,
4767{
4768 struct GNUNET_TIME_Absolute now;
4769 unsigned int candidates;
4770 unsigned int sel1;
4771 unsigned int sel2;
4772 struct GNUNET_TIME_Relative rtt;
4773
4774 /* Pick one or two 'random' queues from n (under constraints of options) */
4775 now = GNUNET_TIME_absolute_get ();
4776 /* FIXME-OPTIMIZE: give queues 'weights' and pick proportional to
4777 weight in the future; weight could be assigned by observed
4778 bandwidth (note: not sure if we should do this for this type
4779 of control traffic though). */
4780 candidates = 0;
4781 for (struct Queue *pos = n->queue_head; NULL != pos;
4782 pos = pos->next_neighbour)
4783 {
4784 if ((0 != (options & RMO_UNCONFIRMED_ALLOWED)) ||
4785 (pos->validated_until.abs_value_us > now.abs_value_us))
4786 candidates++;
4787 }
4788 if (0 == candidates)
4789 {
4790 /* This can happen rarely if the last confirmed queue timed
4791 out just as we were beginning to process this message. */
4793 "Could not route message of type %u to %s: no valid queue\n",
4794 ntohs (hdr->type),
4795 GNUNET_i2s (&n->pid));
4797 "# route selection failed (all no valid queue)",
4798 1,
4799 GNUNET_NO);
4801 }
4802
4805 if (0 == (options & RMO_REDUNDANT))
4806 sel2 = candidates; /* picks none! */
4807 else
4809 candidates = 0;
4810 for (struct Queue *pos = n->queue_head; NULL != pos;
4811 pos = pos->next_neighbour)
4812 {
4813 if ((0 != (options & RMO_UNCONFIRMED_ALLOWED)) ||
4814 (pos->validated_until.abs_value_us > now.abs_value_us))
4815 {
4816 if ((sel1 == candidates) || (sel2 == candidates))
4817 {
4819 "Routing message of type %u to %s using %s (#%u)\n",
4820 ntohs (hdr->type),
4821 GNUNET_i2s (&n->pid),
4822 pos->address,
4823 (sel1 == candidates) ? 1 : 2);
4824 rtt = GNUNET_TIME_relative_min (rtt, pos->pd.aged_rtt);
4825 queue_send_msg (pos, NULL, hdr, ntohs (hdr->size));
4826 }
4827 candidates++;
4828 }
4829 }
4830 return rtt;
4831}
4832
4833
4838{
4842 gcry_cipher_hd_t cipher;
4843
4847 struct
4848 {
4853
4857 char aes_key[256 / 8];
4858
4862 char aes_ctr[128 / 8];
4864};
4865
4866
4875static void
4877 const struct GNUNET_ShortHashCode *iv,
4878 struct DVKeyState *key)
4879{
4880 /* must match what we defive from decapsulated key */
4882 GNUNET_CRYPTO_kdf (&key->material,
4883 sizeof(key->material),
4884 "transport-backchannel-key",
4885 strlen ("transport-backchannel-key"),
4886 km,
4887 sizeof(*km),
4888 iv,
4889 sizeof(*iv),
4890 NULL));
4892 "Deriving backchannel key based on KM %s and IV %s\n",
4893 GNUNET_h2s (km),
4894 GNUNET_sh2s (iv));
4895 GNUNET_assert (0 == gcry_cipher_open (&key->cipher,
4896 GCRY_CIPHER_AES256 /* low level: go for speed */,
4897 GCRY_CIPHER_MODE_CTR,
4898 0 /* flags */));
4899 GNUNET_assert (0 == gcry_cipher_setkey (key->cipher,
4900 &key->material.aes_key,
4901 sizeof(key->material.aes_key)));
4902 gcry_cipher_setctr (key->cipher,
4903 &key->material.aes_ctr,
4904 sizeof(key->material.aes_ctr));
4905}
4906
4907
4917static void
4918dv_hmac (const struct DVKeyState *key,
4919 struct GNUNET_HashCode *hmac,
4920 const void *data,
4921 size_t data_size)
4922{
4923 GNUNET_CRYPTO_hmac (&key->material.hmac_key, data, data_size, hmac);
4924}
4925
4926
4936static void
4937dv_encrypt (struct DVKeyState *key, const void *in, void *dst, size_t in_size)
4938{
4939 GNUNET_assert (0 ==
4940 gcry_cipher_encrypt (key->cipher, dst, in_size, in, in_size));
4941}
4942
4943
4954static enum GNUNET_GenericReturnValue
4956 void *out,
4957 const void *ciph,
4958 size_t out_size)
4959{
4960 return (0 ==
4961 gcry_cipher_decrypt (key->cipher,
4962 out, out_size,
4963 ciph, out_size)) ? GNUNET_OK : GNUNET_SYSERR;
4964}
4965
4966
4972static void
4974{
4975 gcry_cipher_close (key->cipher);
4976 GNUNET_CRYPTO_zero_keys (&key->material, sizeof(key->material));
4977}
4978
4979
4990typedef void (*DVMessageHandler) (void *cls,
4991 struct Neighbour *next_hop,
4992 const struct GNUNET_MessageHeader *hdr,
4994
5009static struct GNUNET_TIME_Relative
5011 unsigned int num_dvhs,
5012 struct DistanceVectorHop **dvhs,
5013 const struct GNUNET_MessageHeader *hdr,
5014 DVMessageHandler use,
5015 void *use_cls,
5017 enum GNUNET_GenericReturnValue without_fc)
5018{
5019 struct TransportDVBoxMessage box_hdr;
5020 struct TransportDVBoxPayloadP payload_hdr;
5021 uint16_t enc_body_size = ntohs (hdr->size);
5022 char enc[sizeof(struct TransportDVBoxPayloadP) + enc_body_size] GNUNET_ALIGN;
5023 struct DVKeyState *key;
5024 struct GNUNET_TIME_Relative rtt;
5025 struct GNUNET_HashCode km;
5026
5027 key = GNUNET_new (struct DVKeyState);
5028 /* Encrypt payload */
5030 box_hdr.total_hops = htons (0);
5031 box_hdr.without_fc = htons (without_fc);
5032 // update_ephemeral (dv);
5033 if (0 ==
5034 GNUNET_TIME_absolute_get_remaining (dv->ephemeral_validity).rel_value_us)
5035 {
5036 GNUNET_CRYPTO_eddsa_kem_encaps (&dv->target.public_key,
5037 &dv->ephemeral_key,
5038 &km);
5039 dv->km = GNUNET_new (struct GNUNET_HashCode);
5040 GNUNET_memcpy (dv->km, &km, sizeof(struct GNUNET_HashCode));
5041 sign_ephemeral (dv);
5042 }
5043 box_hdr.ephemeral_key = dv->ephemeral_key;
5044 payload_hdr.sender_sig = dv->sender_sig;
5045
5047 &box_hdr.iv,
5048 sizeof(box_hdr.iv));
5049 // We are creating this key, so this must work.
5050 // FIXME: Possibly also add return values here. We are processing
5051 // Input from other peers...
5052 dv_setup_key_state_from_km (dv->km, &box_hdr.iv, key);
5053 payload_hdr.sender = GST_my_identity;
5054 payload_hdr.monotonic_time = GNUNET_TIME_absolute_hton (dv->monotime);
5055 dv_encrypt (key, &payload_hdr, enc, sizeof(payload_hdr));
5056 dv_encrypt (key,
5057 hdr,
5058 &enc[sizeof(struct TransportDVBoxPayloadP)],
5059 enc_body_size);
5060 dv_hmac (key, &box_hdr.hmac, enc, sizeof(enc));
5061 dv_key_clean (key);
5063 /* For each selected path, take the pre-computed header and body
5064 and add the path in the middle of the message; then send it. */
5065 for (unsigned int i = 0; i < num_dvhs; i++)
5066 {
5067 struct DistanceVectorHop *dvh = dvhs[i];
5068 unsigned int num_hops = dvh->distance + 1;
5069 char buf[sizeof(struct TransportDVBoxMessage)
5070 + sizeof(struct GNUNET_PeerIdentity) * num_hops
5071 + sizeof(struct TransportDVBoxPayloadP)
5072 + enc_body_size] GNUNET_ALIGN;
5073 struct GNUNET_PeerIdentity *dhops;
5074
5075 box_hdr.header.size = htons (sizeof(buf));
5076 box_hdr.orig_size = htons (sizeof(buf));
5077 box_hdr.num_hops = htons (num_hops);
5078 memcpy (buf, &box_hdr, sizeof(box_hdr));
5079 dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof(box_hdr)];
5080 memcpy (dhops,
5081 dvh->path,
5082 dvh->distance * sizeof(struct GNUNET_PeerIdentity));
5083 dhops[dvh->distance] = dv->target;
5084 if (GNUNET_EXTRA_LOGGING > 0)
5085 {
5086 char *path;
5087
5089 for (unsigned int j = 0; j < num_hops; j++)
5090 {
5091 char *tmp;
5092
5093 GNUNET_asprintf (&tmp, "%s-%s", path, GNUNET_i2s (&dhops[j]));
5094 GNUNET_free (path);
5095 path = tmp;
5096 }
5098 "Routing message of type %u to %s using DV (#%u/%u) via %s\n",
5099 ntohs (hdr->type),
5100 GNUNET_i2s (&dv->target),
5101 i + 1,
5102 num_dvhs,
5103 path);
5104 GNUNET_free (path);
5105 }
5106 rtt = GNUNET_TIME_relative_min (rtt, dvh->pd.aged_rtt);
5107 memcpy (&dhops[num_hops], enc, sizeof(enc));
5108 use (use_cls,
5109 dvh->next_hop,
5110 (const struct GNUNET_MessageHeader *) buf,
5111 options);
5112 GNUNET_free (key);
5113 }
5114 return rtt;
5115}
5116
5117
5127static void
5129 struct Neighbour *next_hop,
5130 const struct GNUNET_MessageHeader *hdr,
5132{
5133 (void) cls;
5134 (void) route_via_neighbour (next_hop, hdr, RMO_UNCONFIRMED_ALLOWED);
5135}
5136
5137
5149static struct GNUNET_TIME_Relative
5151// route_control_message_without_fc (const struct GNUNET_PeerIdentity *target,
5152 const struct GNUNET_MessageHeader *hdr,
5154{
5155 // struct VirtualLink *vl;
5156 struct Neighbour *n;
5157 struct DistanceVector *dv;
5158 struct GNUNET_TIME_Relative rtt1;
5159 struct GNUNET_TIME_Relative rtt2;
5160 const struct GNUNET_PeerIdentity *target = &vl->target;
5161
5163 "Trying to route message of type %u to %s without fc\n",
5164 ntohs (hdr->type),
5165 GNUNET_i2s (target));
5166
5167 // TODO Do this elsewhere. vl should be given as parameter to method.
5168 // vl = lookup_virtual_link (target);
5169 GNUNET_assert (NULL != vl && GNUNET_YES == vl->confirmed);
5170 if (NULL == vl)
5172 n = vl->n;
5173 dv = (0 != (options & RMO_DV_ALLOWED)) ? vl->dv : NULL;
5174 if (0 == (options & RMO_UNCONFIRMED_ALLOWED))
5175 {
5176 /* if confirmed is required, and we do not have anything
5177 confirmed, drop respective options */
5178 if (NULL == n)
5179 n = lookup_neighbour (target);
5180 if ((NULL == dv) && (0 != (options & RMO_DV_ALLOWED)))
5182 }
5183 if ((NULL == n) && (NULL == dv))
5184 {
5186 "Cannot route message of type %u to %s: no route\n",
5187 ntohs (hdr->type),
5188 GNUNET_i2s (target));
5190 "# Messages dropped in routing: no acceptable method",
5191 1,
5192 GNUNET_NO);
5194 }
5196 "Routing message of type %u to %s with options %X\n",
5197 ntohs (hdr->type),
5198 GNUNET_i2s (target),
5199 (unsigned int) options);
5200 /* If both dv and n are possible and we must choose:
5201 flip a coin for the choice between the two; for now 50/50 */
5202 if ((NULL != n) && (NULL != dv) && (0 == (options & RMO_REDUNDANT)))
5203 {
5205 n = NULL;
5206 else
5207 dv = NULL;
5208 }
5209 if ((NULL != n) && (NULL != dv))
5210 options &= ~RMO_REDUNDANT; /* We will do one DV and one direct, that's
5211 enough for redundancy, so clear the flag. */
5214 if (NULL != n)
5215 {
5217 "Try to route message of type %u to %s without fc via neighbour\n",
5218 ntohs (hdr->type),
5219 GNUNET_i2s (target));
5220 rtt1 = route_via_neighbour (n, hdr, options);
5221 }
5222 if (NULL != dv)
5223 {
5224 struct DistanceVectorHop *hops[2];
5225 unsigned int res;
5226
5228 options,
5229 hops,
5230 (0 == (options & RMO_REDUNDANT)) ? 1 : 2);
5231 if (0 == res)
5232 {
5234 "Failed to route message, could not determine DV path\n");
5235 return rtt1;
5236 }
5238 "encapsulate_for_dv 1\n");
5239 rtt2 = encapsulate_for_dv (dv,
5240 res,
5241 hops,
5242 hdr,
5244 NULL,
5246 GNUNET_YES);
5247 }
5248 return GNUNET_TIME_relative_min (rtt1, rtt2);
5249}
5250
5251
5252static void
5253consider_sending_fc (void *cls);
5254
5261static void
5263{
5264 struct VirtualLink *vl = cls;
5265 vl->fc_retransmit_task = NULL;
5266 consider_sending_fc (cls);
5267}
5268
5269
5276static void
5278{
5279 struct VirtualLink *vl = cls;
5280 struct GNUNET_TIME_Absolute monotime;
5283 struct GNUNET_TIME_Relative rtt;
5284
5286 /* OPTIMIZE-FC-BDP: decide sane criteria on when to do this, instead of doing
5287 it always! */
5288 /* For example, we should probably ONLY do this if a bit more than
5289 an RTT has passed, or if the window changed "significantly" since
5290 then. See vl->last_fc_rtt! NOTE: to do this properly, we also
5291 need an estimate for the bandwidth-delay-product for the entire
5292 VL, as that determines "significantly". We have the delay, but
5293 the bandwidth statistics need to be added for the VL!*/(void) duration;
5294
5296 "Sending FC seq %u to %s with new window %llu\n",
5297 (unsigned int) vl->fc_seq_gen,
5298 GNUNET_i2s (&vl->target),
5299 (unsigned long long) vl->incoming_fc_window_size);
5301 vl->last_fc_transmission = monotime;
5303 fc.header.size = htons (sizeof(fc));
5304 fc.seq = htonl (vl->fc_seq_gen++);
5310 fc.sender_time = GNUNET_TIME_absolute_hton (monotime);
5312 if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us)
5313 {
5316 "FC retransmission to %s failed, will retry in %s\n",
5317 GNUNET_i2s (&vl->target),
5320 }
5321 else
5322 {
5323 /* OPTIMIZE-FC-BDP: rtt is not ideal, we can do better! */
5324 vl->last_fc_rtt = rtt;
5325 }
5326 if (NULL != vl->fc_retransmit_task)
5329 {
5331 vl->fc_retransmit_count = 0;
5332 }
5333 vl->fc_retransmit_task =
5335 vl->fc_retransmit_count++;
5336}
5337
5338
5355static void
5357{
5358 struct Neighbour *n = vl->n;
5359 struct DistanceVector *dv = vl->dv;
5360 struct GNUNET_TIME_Absolute now;
5361 struct VirtualLink *vl_next_hop;
5362 int elig;
5363
5365 "check_vl_transmission to target %s\n",
5366 GNUNET_i2s (&vl->target));
5367 /* Check that we have an eligible pending message!
5368 (cheaper than having #transmit_on_queue() find out!) */
5369 elig = GNUNET_NO;
5370 for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
5371 pm = pm->next_vl)
5372 {
5374 "check_vl_transmission loop\n");
5375 if (NULL != pm->qe)
5376 continue; /* not eligible, is in a queue! */
5377 if (pm->bytes_msg + vl->outbound_fc_window_size_used >
5379 {
5381 "Stalled message %" PRIu64
5382 " transmission on VL %s due to flow control: %llu < %llu\n",
5383 pm->logging_uuid,
5384 GNUNET_i2s (&vl->target),
5385 (unsigned long long) vl->outbound_fc_window_size,
5386 (unsigned long long) (pm->bytes_msg
5389 return; /* We have a message, but flow control says "nope" */
5390 }
5392 "Target window on VL %s not stalled. Scheduling transmission on queue\n",
5393 GNUNET_i2s (&vl->target));
5394 /* Notify queues at direct neighbours that we are interested */
5395 now = GNUNET_TIME_absolute_get ();
5396 if (NULL != n)
5397 {
5398 for (struct Queue *queue = n->queue_head; NULL != queue;
5399 queue = queue->next_neighbour)
5400 {
5401 if ((GNUNET_YES == queue->idle) &&
5402 (queue->validated_until.abs_value_us > now.abs_value_us))
5403 {
5405 "Direct neighbour %s not stalled\n",
5406 GNUNET_i2s (&n->pid));
5408 queue,
5410 elig = GNUNET_YES;
5411 }
5412 else
5414 "Neighbour Queue QID: %u (%u) busy or invalid\n",
5415 queue->qid,
5416 queue->idle);
5417 }
5418 }
5419 /* Notify queues via DV that we are interested */
5420 if (NULL != dv)
5421 {
5422 /* Do DV with lower scheduler priority, which effectively means that
5423 IF a neighbour exists and is available, we prefer it. */
5424 for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
5425 pos = pos->next_dv)
5426 {
5427 struct Neighbour *nh = pos->next_hop;
5428
5429
5430 if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
5431 continue; /* skip this one: path not validated */
5432 else
5433 {
5434 vl_next_hop = lookup_virtual_link (&nh->pid);
5435 GNUNET_assert (NULL != vl_next_hop);
5436 if (pm->bytes_msg + vl_next_hop->outbound_fc_window_size_used >
5437 vl_next_hop->outbound_fc_window_size)
5438 {
5440 "Stalled message %" PRIu64
5441 " transmission on next hop %s due to flow control: %llu < %llu\n",
5442 pm->logging_uuid,
5443 GNUNET_i2s (&vl_next_hop->target),
5444 (unsigned long
5445 long) vl_next_hop->outbound_fc_window_size,
5446 (unsigned long long) (pm->bytes_msg
5447 + vl_next_hop->
5448 outbound_fc_window_size_used));
5449 consider_sending_fc (vl_next_hop);
5450 continue; /* We have a message, but flow control says "nope" for the first hop of this path */
5451 }
5452 for (struct Queue *queue = nh->queue_head; NULL != queue;
5453 queue = queue->next_neighbour)
5454 if ((GNUNET_YES == queue->idle) &&
5455 (queue->validated_until.abs_value_us > now.abs_value_us))
5456 {
5458 "Next hop neighbour %s not stalled\n",
5459 GNUNET_i2s (&nh->pid));
5461 queue,
5463 elig = GNUNET_YES;
5464 }
5465 else
5467 "DV Queue QID: %u (%u) busy or invalid\n",
5468 queue->qid,
5469 queue->idle);
5470 }
5471 }
5472 }
5473 if (GNUNET_YES == elig)
5475 "Eligible message %" PRIu64 " of size %u to %s: %llu/%llu\n",
5476 pm->logging_uuid,
5477 pm->bytes_msg,
5478 GNUNET_i2s (&vl->target),
5479 (unsigned long long) vl->outbound_fc_window_size,
5480 (unsigned long long) (pm->bytes_msg
5482 break;
5483 }
5484}
5485
5486
5493static void
5494handle_client_send (void *cls, const struct OutboundMessage *obm)
5495{
5496 struct TransportClient *tc = cls;
5497 struct PendingMessage *pm;
5498 const struct GNUNET_MessageHeader *obmm;
5499 uint32_t bytes_msg;
5500 struct VirtualLink *vl;
5502
5503 GNUNET_assert (CT_CORE == tc->type);
5504 obmm = (const struct GNUNET_MessageHeader *) &obm[1];
5505 bytes_msg = ntohs (obmm->size);
5506 pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
5507 vl = lookup_virtual_link (&obm->peer);
5508 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
5509 {
5511 "Don't have %s as a neighbour (anymore).\n",
5512 GNUNET_i2s (&obm->peer));
5513 /* Failure: don't have this peer as a neighbour (anymore).
5514 Might have gone down asynchronously, so this is NOT
5515 a protocol violation by CORE. Still count the event,
5516 as this should be rare. */
5519 "# messages dropped (neighbour unknown)",
5520 1,
5521 GNUNET_NO);
5522 return;
5523 }
5524
5525 pm = GNUNET_malloc (sizeof(struct PendingMessage) + bytes_msg);
5527 "1 created pm %p storing vl %p\n",
5528 pm,
5529 vl);
5530 pm->logging_uuid = logging_uuid_gen++;
5531 pm->prefs = pp;
5532 pm->client = tc;
5533 pm->vl = vl;
5534 pm->bytes_msg = bytes_msg;
5535 memcpy (&pm[1], obmm, bytes_msg);
5537 "Sending message of type %u with %u bytes as <%" PRIu64
5538 "> to %s\n",
5539 ntohs (obmm->type),
5540 bytes_msg,
5541 pm->logging_uuid,
5542 GNUNET_i2s (&obm->peer));
5544 tc->details.core.pending_msg_head,
5545 tc->details.core.pending_msg_tail,
5546 pm);
5548 vl->pending_msg_head,
5549 vl->pending_msg_tail,
5550 pm);
5553}
5554
5555
5565static void
5567 void *cls,
5569{
5570 struct Neighbour *n;
5571 struct VirtualLink *vl;
5572 struct TransportClient *tc = cls;
5573 const struct GNUNET_MessageHeader *inbox =
5574 (const struct GNUNET_MessageHeader *) &cb[1];
5575 uint16_t isize = ntohs (inbox->size);
5576 const char *is = ((const char *) &cb[1]) + isize;
5577 size_t slen = strlen (is) + 1;
5578 char
5579 mbuf[slen + isize
5580 + sizeof(struct
5584
5585 /* 0-termination of 'is' was checked already in
5586 #check_communicator_backchannel() */
5588 "Preparing backchannel transmission to %s:%s of type %u and size %u\n",
5589 GNUNET_i2s (&cb->pid),
5590 is,
5591 ntohs (inbox->type),
5592 ntohs (inbox->size));
5593 /* encapsulate and encrypt message */
5594 be->header.type =
5596 be->header.size = htons (sizeof(mbuf));
5597 memcpy (&be[1], inbox, isize);
5598 memcpy (&mbuf[sizeof(struct TransportBackchannelEncapsulationMessage)
5599 + isize],
5600 is,
5601 strlen (is) + 1);
5602 // route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED);
5603 vl = lookup_virtual_link (&cb->pid);
5604 if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
5605 {
5607 }
5608 else
5609 {
5610 /* Use route via neighbour */
5611 n = lookup_neighbour (&cb->pid);
5612 if (NULL != n)
5614 n,
5615 &be->header,
5616 RMO_NONE);
5617 }
5619}
5620
5621
5629static int
5631 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
5632{
5633 struct TransportClient *tc = cls;
5634
5635 if (CT_COMMUNICATOR != tc->type)
5636 {
5637 GNUNET_break (0);
5638 return GNUNET_SYSERR;
5639 }
5641 return GNUNET_OK;
5642}
5643
5644
5650static void
5651store_pi (void *cls);
5652
5653
5660static void
5661peerstore_store_own_cb (void *cls, int success)
5662{
5663 struct AddressListEntry *ale = cls;
5664
5665 ale->sc = NULL;
5666 if (GNUNET_YES != success)
5668 "Failed to store our own address `%s' in peerstore!\n",
5669 ale->address);
5670 else
5672 "Successfully stored our own address `%s' in peerstore!\n",
5673 ale->address);
5674 /* refresh period is 1/4 of expiration time, that should be plenty
5675 without being excessive. */
5676 ale->st =
5678 4ULL),
5679 &store_pi,
5680 ale);
5681}
5682
5683
5684static void
5685shc_cont (void *cls, int success)
5686{
5687 struct AddressListEntry *ale = cls;
5689
5692 "transport",
5695 ale->signed_address,
5696 ale->signed_address_len,
5697 expiration,
5700 ale);
5701 if (NULL == ale->sc)
5702 {
5704 "Failed to store our address `%s' with peerstore\n",
5705 ale->address);
5707 &store_pi,
5708 ale);
5709 }
5710}
5711
5712
5718static void
5719store_pi (void *cls)
5720{
5721 struct AddressListEntry *ale = cls;
5722 struct GNUNET_MQ_Envelope *env;
5723 const struct GNUNET_MessageHeader *msg;
5724 const char *dash;
5725 char *address_uri;
5727 unsigned int add_success;
5728
5729 dash = strchr (ale->address, '-');
5730 GNUNET_assert (NULL != dash);
5731 dash++;
5732 GNUNET_asprintf (&address_uri,
5733 "%s://%s",
5734 prefix,
5735 dash);
5737 ale->st = NULL;
5739 "Storing our address `%s' in peerstore until %s!\n",
5740 ale->address,
5743 address_uri);
5744 if (GNUNET_OK != add_success)
5745 {
5747 "Storing our address `%s' %s\n",
5748 address_uri,
5749 GNUNET_NO == add_success ? "not done" : "failed");
5750 GNUNET_free (address_uri);
5751 return;
5752 }
5753 else
5754 {
5755
5757 "Storing our address `%s'\n",
5758 address_uri);
5759 }
5760 // FIXME hello_mono_time used here?? What about expiration in ale?
5762 ale->nt,
5765 &ale->signed_address,
5766 &ale->signed_address_len);
5767 GNUNET_free (address_uri);
5773 "store_pi 1\n");
5775 msg,
5776 shc_cont,
5777 ale);
5778 GNUNET_free (env);
5779}
5780
5781
5788static void
5790 const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
5791{
5792 struct TransportClient *tc = cls;
5793 struct AddressListEntry *ale;
5794 size_t slen;
5795
5796 /* 0-termination of &aam[1] was checked in #check_add_address */
5798 "Communicator added address `%s'!\n",
5799 (const char *) &aam[1]);
5800 slen = ntohs (aam->header.size) - sizeof(*aam);
5801 ale = GNUNET_malloc (sizeof(struct AddressListEntry) + slen);
5802 ale->tc = tc;
5803 ale->address = (const char *) &ale[1];
5805 ale->aid = aam->aid;
5806 ale->nt = (enum GNUNET_NetworkType) ntohl (aam->nt);
5807 memcpy (&ale[1], &aam[1], slen);
5808 GNUNET_CONTAINER_DLL_insert (tc->details.communicator.addr_head,
5809 tc->details.communicator.addr_tail,
5810 ale);
5811 ale->st = GNUNET_SCHEDULER_add_now (&store_pi, ale);
5813}
5814
5815
5822static void
5824 const struct GNUNET_TRANSPORT_DelAddressMessage *dam)
5825{
5826 struct TransportClient *tc = cls;
5827 struct AddressListEntry *alen;
5828
5829 if (CT_COMMUNICATOR != tc->type)
5830 {
5831 GNUNET_break (0);
5833 return;
5834 }
5835 for (struct AddressListEntry *ale = tc->details.communicator.addr_head;
5836 NULL != ale;
5837 ale = alen)
5838 {
5839 alen = ale->next;
5840 if (dam->aid != ale->aid)
5841 continue;
5842 GNUNET_assert (ale->tc == tc);
5844 "Communicator deleted address `%s'!\n",
5845 ale->address);
5848 return;
5849 }
5851 "Communicator removed address we did not even have.\n");
5853 // GNUNET_SERVICE_client_drop (tc->client);
5854}
5855
5856
5864static void
5866
5867
5875static void
5877{
5878 struct CoreSentContext *ctx = cls;
5879 struct VirtualLink *vl = ctx->vl;
5880
5881 if (NULL == vl)
5882 {
5883 /* lost the link in the meantime, ignore */
5884 GNUNET_free (ctx);
5885 return;
5886 }
5889 vl->incoming_fc_window_size_ram -= ctx->size;
5890 vl->incoming_fc_window_size_used += ctx->isize;
5892 GNUNET_free (ctx);
5893}
5894
5895
5896static void
5898 const struct GNUNET_MessageHeader *mh,
5899 struct CommunicatorMessageContext *cmc,
5900 unsigned int free_cmc)
5901{
5902 uint16_t size = ntohs (mh->size);
5903 int have_core;
5904
5905 if (vl->incoming_fc_window_size_ram > UINT_MAX - size)
5906 {
5908 "# CORE messages dropped (FC arithmetic overflow)",
5909 1,
5910 GNUNET_NO);
5912 "CORE messages of type %u with %u bytes dropped (FC arithmetic overflow)\n",
5913 (unsigned int) ntohs (mh->type),
5914 (unsigned int) ntohs (mh->size));
5915 if (GNUNET_YES == free_cmc)
5917 return;
5918 }
5920 {
5922 "# CORE messages dropped (FC window overflow)",
5923 1,
5924 GNUNET_NO);
5926 "CORE messages of type %u with %u bytes dropped (FC window overflow)\n",
5927 (unsigned int) ntohs (mh->type),
5928 (unsigned int) ntohs (mh->size));
5929 if (GNUNET_YES == free_cmc)
5931 return;
5932 }
5933
5934 /* Forward to all CORE clients */
5935 have_core = GNUNET_NO;
5936 for (struct TransportClient *tc = clients_head; NULL != tc; tc = tc->next)
5937 {
5938 struct GNUNET_MQ_Envelope *env;
5939 struct InboundMessage *im;
5940 struct CoreSentContext *ctx;
5941
5942 if (CT_CORE != tc->type)
5943 continue;
5946 ctx = GNUNET_new (struct CoreSentContext);
5947 ctx->vl = vl;
5948 ctx->size = size;
5949 ctx->isize = (GNUNET_NO == have_core) ? size : 0;
5950 have_core = GNUNET_YES;
5953 im->peer = cmc->im.sender;
5954 memcpy (&im[1], mh, size);
5955 GNUNET_MQ_send (tc->mq, env);
5957 }
5958 if (GNUNET_NO == have_core)
5959 {
5961 "Dropped message to CORE: no CORE client connected!\n");
5962 /* Nevertheless, count window as used, as it is from the
5963 perspective of the other peer! */
5965 /* TODO-M1 */
5967 "Dropped message of type %u with %u bytes to CORE: no CORE client connected!\n",
5968 (unsigned int) ntohs (mh->type),
5969 (unsigned int) ntohs (mh->size));
5970 if (GNUNET_YES == free_cmc)
5972 return;
5973 }
5975 "Delivered message from %s of type %u to CORE recv window %d\n",
5976 GNUNET_i2s (&cmc->im.sender),
5977 ntohs (mh->type),
5979 if (vl->core_recv_window > 0)
5980 {
5981 if (GNUNET_YES == free_cmc)
5983 return;
5984 }
5985 /* Wait with calling #finish_cmc_handling(cmc) until the message
5986 was processed by CORE MQs (for CORE flow control)! */
5987 if (GNUNET_YES == free_cmc)
5989}
5990
5991
6000static void
6002{
6003 struct CommunicatorMessageContext *cmc = cls;
6004 // struct CommunicatorMessageContext *cmc_copy =
6005 // GNUNET_new (struct CommunicatorMessageContext);
6006 struct GNUNET_MessageHeader *mh_copy;
6007 struct RingBufferEntry *rbe;
6008 struct VirtualLink *vl;
6009 uint16_t size = ntohs (mh->size);
6010
6012 "Handling raw message of type %u with %u bytes\n",
6013 (unsigned int) ntohs (mh->type),
6014 (unsigned int) ntohs (mh->size));
6015
6016 if ((size > UINT16_MAX - sizeof(struct InboundMessage)) ||
6017 (size < sizeof(struct GNUNET_MessageHeader)))
6018 {
6019 struct GNUNET_SERVICE_Client *client = cmc->tc->client;
6020
6021 GNUNET_break (0);
6022 finish_cmc_handling (cmc);
6024 return;
6025 }
6026 vl = lookup_virtual_link (&cmc->im.sender);
6027 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
6028 {
6029 /* FIXME: sender is giving us messages for CORE but we don't have
6030 the link up yet! I *suspect* this can happen right now (i.e.
6031 sender has verified us, but we didn't verify sender), but if
6032 we pass this on, CORE would be confused (link down, messages
6033 arrive). We should investigate more if this happens often,
6034 or in a persistent manner, and possibly do "something" about
6035 it. Thus logging as error for now. */
6036
6037 mh_copy = GNUNET_malloc (size);
6038 rbe = GNUNET_new (struct RingBufferEntry);
6039 rbe->cmc = cmc;
6040 /*cmc_copy->tc = cmc->tc;
6041 cmc_copy->im = cmc->im;*/
6042 GNUNET_memcpy (mh_copy, mh, size);
6043
6044 rbe->mh = mh_copy;
6045
6047 {
6048 struct RingBufferEntry *rbe_old = ring_buffer[ring_buffer_head];
6049 GNUNET_free (rbe_old->cmc);
6050 GNUNET_free (rbe_old->mh);
6051 GNUNET_free (rbe_old);
6052 }
6053 ring_buffer[ring_buffer_head] = rbe;// cmc_copy;
6054 // cmc_copy->mh = (const struct GNUNET_MessageHeader *) mh_copy;
6055 cmc->mh = (const struct GNUNET_MessageHeader *) mh_copy;
6057 "Storing message for %s and type %u (%u) in ring buffer head %u is full %u\n",
6058 GNUNET_i2s (&cmc->im.sender),
6059 (unsigned int) ntohs (mh->type),
6060 (unsigned int) ntohs (mh_copy->type),
6064 {
6065 ring_buffer_head = 0;
6067 }
6068 else
6070
6072 "%u items stored in ring buffer\n",
6075
6076 /*GNUNET_break_op (0);
6077 GNUNET_STATISTICS_update (GST_stats,
6078 "# CORE messages dropped (virtual link still down)",
6079 1,
6080 GNUNET_NO);
6081
6082 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
6083 "CORE messages of type %u with %u bytes dropped (virtual link still down)\n",
6084 (unsigned int) ntohs (mh->type),
6085 (unsigned int) ntohs (mh->size));
6086 finish_cmc_handling (cmc);*/
6089 // GNUNET_free (cmc);
6090 return;
6091 }
6093}
6094
6095
6103static int
6105{
6106 uint16_t size = ntohs (fb->header.size);
6107 uint16_t bsize = size - sizeof(*fb);
6108
6109 (void) cls;
6110 if (0 == bsize)
6111 {
6112 GNUNET_break_op (0);
6113 return GNUNET_SYSERR;
6114 }
6115 if (bsize + ntohs (fb->frag_off) > ntohs (fb->msg_size))
6116 {
6117 GNUNET_break_op (0);
6118 return GNUNET_SYSERR;
6119 }
6120 if (ntohs (fb->frag_off) >= ntohs (fb->msg_size))
6121 {
6122 GNUNET_break_op (0);
6123 return GNUNET_SYSERR;
6124 }
6125 return GNUNET_YES;
6126}
6127
6128
6134static void
6136{
6137 struct AcknowledgementCummulator *ac = cls;
6138
6139 ac->task = NULL;
6140 GNUNET_assert (0 == ac->num_acks);
6142 GNUNET_YES ==
6144 GNUNET_free (ac);
6145}
6146
6147
6153static void
6155{
6156 struct Neighbour *n;
6157 struct VirtualLink *vl;
6158 struct AcknowledgementCummulator *ac = cls;
6159 char buf[sizeof(struct TransportReliabilityAckMessage)
6160 + ac->num_acks
6162 struct TransportReliabilityAckMessage *ack =
6163 (struct TransportReliabilityAckMessage *) buf;
6165
6166 ac->task = NULL;
6168 "Sending ACK with %u components to %s\n",
6169 ac->num_acks,
6170 GNUNET_i2s (&ac->target));
6171 GNUNET_assert (0 < ac->num_acks);
6173 ack->header.size =
6174 htons (sizeof(*ack)
6175 + ac->num_acks * sizeof(struct TransportCummulativeAckPayloadP));
6176 ack->ack_counter = htonl (ac->ack_counter += ac->num_acks);
6177 ap = (struct TransportCummulativeAckPayloadP *) &ack[1];
6178 for (unsigned int i = 0; i < ac->num_acks; i++)
6179 {
6180 ap[i].ack_uuid = ac->ack_uuids[i].ack_uuid;
6183 }
6184 /*route_control_message_without_fc (
6185 &ac->target,
6186 &ack->header,
6187 RMO_DV_ALLOWED);*/
6188 vl = lookup_virtual_link (&ac->target);
6189 if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
6190 {
6192 vl,
6193 &ack->header,
6195 }
6196 else
6197 {
6198 /* Use route via neighbour */
6199 n = lookup_neighbour (&ac->target);
6200 if (NULL != n)
6202 n,
6203 &ack->header,
6204 RMO_NONE);
6205 }
6206 ac->num_acks = 0;
6209 ac);
6210}
6211
6212
6221static void
6223 const struct AcknowledgementUUIDP *ack_uuid,
6224 struct GNUNET_TIME_Absolute max_delay)
6225{
6226 struct AcknowledgementCummulator *ac;
6227
6229 "Scheduling ACK %s for transmission to %s\n",
6230 GNUNET_uuid2s (&ack_uuid->value),
6231 GNUNET_i2s (pid));
6233 if (NULL == ac)
6234 {
6236 ac->target = *pid;
6237 ac->min_transmission_time = max_delay;
6241 &ac->target,
6242 ac,
6244 }
6245 else
6246 {
6247 if (MAX_CUMMULATIVE_ACKS == ac->num_acks)
6248 {
6249 /* must run immediately, ack buffer full! */
6251 }
6255 }
6258 ac->ack_uuids[ac->num_acks].ack_uuid = *ack_uuid;
6259 ac->num_acks++;
6262 ac);
6263}
6264
6265
6270{
6275
6280};
6281
6282
6292static int
6293find_by_message_uuid (void *cls, uint32_t key, void *value)
6294{
6295 struct FindByMessageUuidContext *fc = cls;
6296 struct ReassemblyContext *rc = value;
6297
6298 (void) key;
6299 if (0 == GNUNET_memcmp (&fc->message_uuid, &rc->msg_uuid))
6300 {
6301 fc->rc = rc;
6302 return GNUNET_NO;
6303 }
6304 return GNUNET_YES;
6305}
6306
6307
6315static void
6317{
6318 struct CommunicatorMessageContext *cmc = cls;
6319 struct VirtualLink *vl;
6320 struct ReassemblyContext *rc;
6321 const struct GNUNET_MessageHeader *msg;
6322 uint16_t msize;
6323 uint16_t fsize;
6324 uint16_t frag_off;
6325 char *target;
6326 struct GNUNET_TIME_Relative cdelay;
6327 struct FindByMessageUuidContext fc;
6328
6329 vl = lookup_virtual_link (&cmc->im.sender);
6330 if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
6331 {
6332 struct GNUNET_SERVICE_Client *client = cmc->tc->client;
6333
6335 "No virtual link for %s to handle fragment\n",
6336 GNUNET_i2s (&cmc->im.sender));
6337 GNUNET_break (0);
6338 finish_cmc_handling (cmc);
6340 return;
6341 }
6342 if (NULL == vl->reassembly_map)
6343 {
6345 vl->reassembly_heap =
6350 vl);
6351 }
6352 msize = ntohs (fb->msg_size);
6353 fc.message_uuid = fb->msg_uuid;
6354 fc.rc = NULL;
6356 fb->msg_uuid.uuid,
6358 &fc);
6359 fsize = ntohs (fb->header.size) - sizeof(*fb);
6360 if (NULL == (rc = fc.rc))
6361 {
6362 rc = GNUNET_malloc (sizeof(*rc) + msize /* reassembly payload buffer */
6363 + (msize + 7) / 8 * sizeof(uint8_t) /* bitfield */);
6364 rc->msg_uuid = fb->msg_uuid;
6365 rc->virtual_link = vl;
6366 rc->msg_size = msize;
6367 rc->reassembly_timeout =
6371 rc,
6375 vl->reassembly_map,
6376 rc->msg_uuid.uuid,
6377 rc,
6379 target = (char *) &rc[1];
6380 rc->bitfield = (uint8_t *) (target + rc->msg_size);
6381 if (fsize != rc->msg_size)
6382 rc->msg_missing = rc->msg_size;
6383 else
6384 rc->msg_missing = 0;
6386 "Received fragment with size %u at offset %u/%u %u bytes missing from %s for NEW message %"
6387 PRIu64 "\n",
6388 fsize,
6389 ntohs (fb->frag_off),
6390 msize,
6391 rc->msg_missing,
6392 GNUNET_i2s (&cmc->im.sender),
6393 fb->msg_uuid.uuid);
6394 }
6395 else
6396 {
6397 target = (char *) &rc[1];
6399 "Received fragment at offset %u/%u from %s for message %u\n",
6400 ntohs (fb->frag_off),
6401 msize,
6402 GNUNET_i2s (&cmc->im.sender),
6403 (unsigned int) fb->msg_uuid.uuid);
6404 }
6405 if (msize != rc->msg_size)
6406 {
6407 GNUNET_break (0);
6408 finish_cmc_handling (cmc);
6409 return;
6410 }
6411
6412 /* reassemble */
6413 if (0 == fsize)
6414 {
6415 GNUNET_break (0);
6416 finish_cmc_handling (cmc);
6417 return;
6418 }
6419 frag_off = ntohs (fb->frag_off);
6420 if (frag_off + fsize > msize)
6421 {
6422 /* Fragment (plus fragment size) exceeds message size! */
6423 GNUNET_break_op (0);
6424 finish_cmc_handling (cmc);
6425 return;
6426 }
6427 memcpy (&target[frag_off], &fb[1], fsize);
6428 /* update bitfield and msg_missing */
6429 for (unsigned int i = frag_off; i < frag_off + fsize; i++)
6430 {
6431 if (0 == (rc->bitfield[i / 8] & (1 << (i % 8))))
6432 {
6433 rc->bitfield[i / 8] |= (1 << (i % 8));
6434 rc->msg_missing--;
6435 }
6436 }
6437
6438 /* Compute cumulative ACK */
6440 cdelay = GNUNET_TIME_relative_multiply (cdelay, rc->msg_missing / fsize);
6441 if (0 == rc->msg_missing)
6442 cdelay = GNUNET_TIME_UNIT_ZERO;
6443 cummulative_ack (&cmc->im.sender,
6444 &fb->ack_uuid,
6447 /* is reassembly complete? */
6448 if (0 != rc->msg_missing)
6449 {
6450 finish_cmc_handling (cmc);
6451 return;
6452 }
6453 /* reassembly is complete, verify result */
6454 msg = (const struct GNUNET_MessageHeader *) &rc[1];
6455 if (ntohs (msg->size) != rc->msg_size)
6456 {
6457 GNUNET_break (0);
6459 finish_cmc_handling (cmc);
6460 return;
6461 }
6462 /* successful reassembly */
6464 "Fragment reassembly complete for message %u\n",
6465 (unsigned int) fb->msg_uuid.uuid);
6466 /* FIXME: check that the resulting msg is NOT a
6467 DV Box or Reliability Box, as that is NOT allowed! */
6468 cmc->mh = msg;
6470 /* FIXME-OPTIMIZE: really free here? Might be bad if fragments are still
6471 en-route and we forget that we finished this reassembly immediately!
6472 -> keep around until timeout?
6473 -> shorten timeout based on ACK? */
6475}
6476
6477
6485static int
6487 const struct TransportReliabilityBoxMessage *rb)
6488{
6489 (void) cls;
6490 const struct GNUNET_MessageHeader *inbox = (const struct
6491 GNUNET_MessageHeader *) &rb[1];
6492
6494 "check_send_msg with size %u: inner msg type %u and size %u (%lu %lu)\n",
6495 ntohs (rb->header.size),
6496 ntohs (inbox->type),
6497 ntohs (inbox->size),
6498 sizeof (struct TransportReliabilityBoxMessage),
6499 sizeof (struct GNUNET_MessageHeader));
6501 return GNUNET_YES;
6502}
6503
6504
6512static void
6514 const struct TransportReliabilityBoxMessage *rb)
6515{
6516 struct CommunicatorMessageContext *cmc = cls;
6517 const struct GNUNET_MessageHeader *inbox =
6518 (const struct GNUNET_MessageHeader *) &rb[1];
6519 struct GNUNET_TIME_Relative rtt;
6520
6522 "Received reliability box from %s with UUID %s of type %u\n",
6523 GNUNET_i2s (&cmc->im.sender),
6525 (unsigned int) ntohs (inbox->type));
6526 rtt = GNUNET_TIME_UNIT_SECONDS; /* FIXME: should base this on "RTT", but we
6527 do not really have an RTT for the
6528 * incoming* queue (should we have
6529 the sender add it to the rb message?) */
6531 &cmc->im.sender,
6532 &rb->ack_uuid,
6533 (0 == ntohl (rb->ack_countdown))
6536 GNUNET_TIME_relative_divide (rtt, 8 /* FIXME: magic constant */)));
6537 /* continue with inner message */
6538 /* FIXME: check that inbox is NOT a DV Box, fragment or another
6539 reliability box (not allowed!) */
6540 cmc->mh = inbox;
6542}
6543
6544
6553static void
6554update_pd_age (struct PerformanceData *pd, unsigned int age)
6555{
6556 unsigned int sage;
6557
6558 if (age == pd->last_age)
6559 return; /* nothing to do */
6560 sage = GNUNET_MAX (pd->last_age, age - 2 * GOODPUT_AGING_SLOTS);
6561 for (unsigned int i = sage; i <= age - GOODPUT_AGING_SLOTS; i++)
6562 {
6563 struct TransmissionHistoryEntry *the = &pd->the[i % GOODPUT_AGING_SLOTS];
6564
6565 the->bytes_sent = 0;
6566 the->bytes_received = 0;
6567 }
6568 pd->last_age = age;
6569}
6570
6571
6580static void
6582 struct GNUNET_TIME_Relative rtt,
6583 uint16_t bytes_transmitted_ok)
6584{
6585 uint64_t nval = rtt.rel_value_us;
6586 uint64_t oval = pd->aged_rtt.rel_value_us;
6587 unsigned int age = get_age ();
6588 struct TransmissionHistoryEntry *the = &pd->the[age % GOODPUT_AGING_SLOTS];
6589
6590 if (oval == GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us)
6591 pd->aged_rtt = rtt;
6592 else
6593 pd->aged_rtt.rel_value_us = (nval + 7 * oval) / 8;
6594 update_pd_age (pd, age);
6595 the->bytes_received += bytes_transmitted_ok;
6596}
6597
6598
6606static void
6608 struct GNUNET_TIME_Relative rtt,
6609 uint16_t bytes_transmitted_ok)
6610{
6611 update_performance_data (&q->pd, rtt, bytes_transmitted_ok);
6612}
6613
6614
6622static void
6624 struct GNUNET_TIME_Relative rtt,
6625 uint16_t bytes_transmitted_ok)
6626{
6627 update_performance_data (&dvh->pd, rtt, bytes_transmitted_ok);
6628}
6629
6630
6638static void
6640{
6641 struct PendingMessage *pos;
6642
6644 "Complete transmission of message %" PRIu64 " %u\n",
6645 pm->logging_uuid,
6646 pm->pmt);
6647 switch (pm->pmt)
6648 {
6649 case PMT_CORE:
6651 /* Full message sent, we are done */
6653 return;
6654
6655 case PMT_FRAGMENT_BOX:
6656 /* Fragment sent over reliable channel */
6657 pos = pm->frag_parent;
6661 "pos frag_off %lu pos bytes_msg %lu pmt %u parent %u\n",
6662 (unsigned long) pos->frag_off,
6663 (unsigned long) pos->bytes_msg,
6664 pos->pmt,
6665 NULL == pos->frag_parent ? 1 : 0);
6666 /* check if subtree is done */
6667 while ((NULL == pos->head_frag) && (pos->frag_off == (pos->bytes_msg
6668 - sizeof(struct
6670 &&
6671 (NULL != pos->frag_parent))
6672 {
6673 pm = pos;
6674 pos = pm->frag_parent;
6675 if ((NULL == pos) && (PMT_DV_BOX == pm->pmt))
6676 {
6678 return;
6679 }
6680 else if (PMT_DV_BOX == pm->pmt)
6681 {
6683 return;
6684 }
6687 }
6688
6689 /* Was this the last applicable fragment? */
6690 if ((NULL == pos->head_frag) && (NULL == pos->frag_parent || PMT_DV_BOX ==
6691 pos->pmt) &&
6692 (pos->frag_off == pos->bytes_msg))
6694 return;
6695
6696 case PMT_DV_BOX:
6698 "Completed transmission of message %" PRIu64 " (DV Box)\n",
6699 pm->logging_uuid);
6700 if (NULL != pm->frag_parent)
6701 {
6702 pos = pm->frag_parent;
6704 pos->bpm = NULL;
6706 }
6707 else
6709 return;
6710 }
6711}
6712
6713
6721static void
6723 struct GNUNET_TIME_Relative ack_delay)
6724{
6725 struct GNUNET_TIME_Relative delay;
6726
6728 delay = GNUNET_TIME_relative_subtract (delay, ack_delay);
6729 if (NULL != pa->queue && 1 == pa->num_send)
6731 if (NULL != pa->dvh && 1 == pa->num_send)
6732 update_dvh_performance (pa->dvh, delay, pa->message_size);
6733 if (NULL != pa->pm)
6736}
6737
6738
6746static int
6748 const struct TransportReliabilityAckMessage *ra)
6749{
6750 unsigned int n_acks;
6751
6752 (void) cls;
6753 n_acks = (ntohs (ra->header.size) - sizeof(*ra))
6754 / sizeof(struct TransportCummulativeAckPayloadP);
6755 if (0 == n_acks)
6756 {
6757 GNUNET_break_op (0);
6758 return GNUNET_SYSERR;
6759 }
6760 if ((ntohs (ra->header.size) - sizeof(*ra)) !=
6761 n_acks * sizeof(struct TransportCummulativeAckPayloadP))
6762 {
6763 GNUNET_break_op (0);
6764 return GNUNET_SYSERR;
6765 }
6766 return GNUNET_OK;
6767}
6768
6769
6777static void
6779 const struct TransportReliabilityAckMessage *ra)
6780{
6781 struct CommunicatorMessageContext *cmc = cls;
6782 const struct TransportCummulativeAckPayloadP *ack;
6783 unsigned int n_acks;
6784 uint32_t ack_counter;
6785
6786 n_acks = (ntohs (ra->header.size) - sizeof(*ra))
6787 / sizeof(struct TransportCummulativeAckPayloadP);
6788 ack = (const struct TransportCummulativeAckPayloadP *) &ra[1];
6789 for (unsigned int i = 0; i < n_acks; i++)
6790 {
6791 struct PendingAcknowledgement *pa =
6793 if (NULL == pa)
6794 {
6796 "Received ACK from %s with UUID %s which is unknown to us!\n",
6797 GNUNET_i2s (&cmc->im.sender),
6798 GNUNET_uuid2s (&ack[i].ack_uuid.value));
6800 GST_stats,
6801 "# FRAGMENT_ACKS dropped, no matching pending message",
6802 1,
6803 GNUNET_NO);
6804 continue;
6805 }
6807 "Received ACK from %s with UUID %s\n",
6808 GNUNET_i2s (&cmc->im.sender),
6809 GNUNET_uuid2s (&ack[i].ack_uuid.value));
6810 handle_acknowledged (pa, GNUNET_TIME_relative_ntoh (ack[i].ack_delay));
6811 }
6812
6813 ack_counter = htonl (ra->ack_counter);
6814 (void) ack_counter; /* silence compiler warning for now */
6815 // FIXME-OPTIMIZE: track ACK losses based on ack_counter somewhere!
6816 // (DV and/or Neighbour?)
6817 finish_cmc_handling (cmc);
6818}
6819
6820
6828static int
6830 void *cls,
6832{
6833 uint16_t size = ntohs (be->header.size) - sizeof(*be);
6834 const struct GNUNET_MessageHeader *inbox =
6835 (const struct GNUNET_MessageHeader *) &be[1];
6836 const char *is;
6837 uint16_t isize;
6838
6839 (void) cls;
6840 if (ntohs (inbox->size) >= size)
6841 {
6842 GNUNET_break_op (0);
6843 return GNUNET_SYSERR;
6844 }
6845 isize = ntohs (inbox->size);
6846 is = ((const char *) inbox) + isize;
6847 size -= isize;
6848 if ('\0' != is[size - 1])
6849 {
6850 GNUNET_break_op (0);
6851 return GNUNET_SYSERR;
6852 }
6853 return GNUNET_YES;
6854}
6855
6856
6865static void
6867 void *cls,
6869{
6870 struct CommunicatorMessageContext *cmc = cls;
6872 struct GNUNET_MQ_Envelope *env;
6873 struct TransportClient *tc;
6874 const struct GNUNET_MessageHeader *inbox =
6875 (const struct GNUNET_MessageHeader *) &be[1];
6876 uint16_t isize = ntohs (inbox->size);
6877 const char *target_communicator = ((const char *) inbox) + isize;
6878 char *sender;
6879 char *self;
6880
6881 GNUNET_asprintf (&sender,
6882 "%s",
6883 GNUNET_i2s (&cmc->im.sender));
6884 GNUNET_asprintf (&self,
6885 "%s",
6887
6888 /* Find client providing this communicator */
6889 for (tc = clients_head; NULL != tc; tc = tc->next)
6890 if ((CT_COMMUNICATOR == tc->type) &&
6891 (0 ==
6892 strcmp (tc->details.communicator.address_prefix, target_communicator)))
6893 break;
6894 if (NULL == tc)
6895 {
6896 char *stastr;
6897
6899 &stastr,
6900 "# Backchannel message dropped: target communicator `%s' unknown",
6901 target_communicator);
6903 GNUNET_free (stastr);
6904 finish_cmc_handling (cmc);
6905 return;
6906 }
6907 /* Finally, deliver backchannel message to communicator */
6909 "Delivering backchannel message from %s to %s of type %u to %s\n",
6910 sender,
6911 self,
6912 ntohs (inbox->type),
6913 target_communicator);
6915 cbi,
6916 isize,
6918 cbi->pid = cmc->im.sender;