34#include <pulse/simple.h>
35#include <pulse/error.h>
36#include <pulse/rtclock.h>
38#include <pulse/pulseaudio.h>
40#include <opus/opus_types.h>
43#define DEBUG_READ_PURE_OGG 1
44#define DEBUG_DUMP_DECODED_OGG 1
48#define SAMPLING_RATE 48000
53#define MAX_FRAME_SIZE (960 * 6)
59 .format = PA_SAMPLE_FLOAT32LE,
64#ifdef DEBUG_DUMP_DECODED_OGG
76static pa_threaded_mainloop *
m;
91static OpusDecoder *
dec;
111static ogg_sync_state
oy;
116static ogg_stream_state
os;
152 if (((
unsigned int)
op->bytes) <
sizeof(header))
162 "Header: v%u, %u-ch, skip %u, %uHz, %u gain\n",
174 "This implementation does not support non-mono streams\n");
182 "Cannot create encoder: %s\n",
183 opus_strerror (err));
189 "Decoder initialization failed: %s\n",
190 opus_strerror (err));
194 if (0 != header.
gain)
199 int gainadj = (
int) header.
gain;
200 err = opus_decoder_ctl (
dec, OPUS_SET_GAIN (gainadj));
201 if (OPUS_UNIMPLEMENTED == err)
203 gain = pow (10.0, gainadj / 5120.0);
205 else if (OPUS_OK != err)
207 fprintf (stderr,
"Error setting gain: %s\n", opus_strerror (err));
216#ifdef DEBUG_DUMP_DECODED_OGG
220 unsigned char buf[4];
222 buf[0] = (
unsigned char) (i32 & 0xFF);
223 buf[1] = (
unsigned char) (i32 >> 8 & 0xFF);
224 buf[2] = (
unsigned char) (i32 >> 16 & 0xFF);
225 buf[3] = (
unsigned char) (i32 >> 24 & 0xFF);
226 return fwrite (buf, 4, 1, file);
233 unsigned char buf[2];
235 buf[0] = (
unsigned char) (i16 & 0xFF);
236 buf[1] = (
unsigned char) (i16 >> 8 & 0xFF);
237 return fwrite (buf, 2, 1, file);
247 ret = fprintf (file,
"RIFF") >= 0;
250 ret &= fprintf (file,
"WAVEfmt ") >= 0;
259 ret &= fprintf (file,
"data") >= 0;
262 return !
ret ? -1 : 16;
278#ifdef DEBUG_DUMP_DECODED_OGG
279 static int wrote_wav_header;
284 wrote_wav_header = 1;
287 maxout = 0 > maxout ? 0 : maxout;
298 to_write = out_len < maxout ? out_len : (unsigned) maxout;
304 "Writing %u * %u * %u = %llu bytes into PA\n",
307 (
unsigned int)
sizeof(
float),
308 (
unsigned long long) (to_write *
channels *
sizeof(
float)));
309#ifdef DEBUG_DUMP_DECODED_OGG
312# define fminf(_x, _y) ((_x) < (_y) ? (_x) : (_y))
313# define fmaxf(_x, _y) ((_x) > (_y) ? (_x) : (_y))
314# define float2int(flt) ((int) (floor (.5 + flt)))
321 fwrite (out, 2 *
channels, out_len < maxout ? out_len : maxout, stdout);
327 PA_SEEK_RELATIVE) < 0)
330 _ (
"pa_stream_write() failed: %s\n"),
331 pa_strerror (pa_context_errno (
context)));
340 "Wrote %" PRId64
" samples\n",
362 static int stream_init;
363 int64_t page_granule = 0;
365 static int has_opus_stream;
366 static int has_tags_packet;
367 static int32_t opus_serialno;
368 static int64_t link_out;
369 static int64_t packet_count;
371 static int total_links;
372 static int gran_offset;
374 while (1 == ogg_sync_pageout (&
oy, &og))
376 if (0 == stream_init)
379 "Initialized the stream\n");
380 ogg_stream_init (&
os, ogg_page_serialno (&og));
383 if (ogg_page_serialno (&og) !=
os.serialno)
387 "Re-set serial number\n");
388 ogg_stream_reset_serialno (&
os, ogg_page_serialno (&og));
391 ogg_stream_pagein (&
os, &og);
392 page_granule = ogg_page_granulepos (&og);
394 "Reading page that ends at %" PRId64
"\n",
397 while (1 == ogg_stream_packetout (&
os, &
op))
401 if (
op.b_o_s && (
op.bytes >= 8) && ! memcmp (
op.packet,
"OpusHead", 8))
404 "Got Opus Header\n");
405 if (has_opus_stream && has_tags_packet)
413 opus_decoder_destroy (
dec);
416 "\nWarning: stream %" PRId64
417 " ended without EOS and a new stream began.\n",
418 (int64_t)
os.serialno);
420 if (! has_opus_stream)
422 if ((packet_count > 0) && (opus_serialno ==
os.serialno) )
425 "\nError: Apparent chaining without changing serial number (%"
426 PRId64
"==%" PRId64
").\n",
427 (int64_t) opus_serialno, (int64_t)
os.serialno);
430 opus_serialno =
os.serialno;
438 "Got header for stream %" PRId64
", this is %dth link\n",
439 (int64_t) opus_serialno, total_links);
443 fprintf (stderr,
"\nWarning: ignoring opus stream %" PRId64
"\n",
444 (int64_t)
os.serialno);
447 if (! has_opus_stream || (
os.serialno != opus_serialno) )
454 if (0 == packet_count)
457 "Decoding header\n");
462 if ((0 != ogg_stream_packetout (&
os, &
op)) || (255 ==
463 og.header[og.header_len
471 "Extra packets on initial header page. Invalid stream.\n");
482 "Allocating %u * %u * %u = %llu bytes of buffer space\n",
485 (
unsigned int)
sizeof(
float),
491 else if (1 == packet_count)
494 if ((0 != ogg_stream_packetout (&
os, &
op)) || (255 ==
495 og.header[og.header_len
499 "Extra packets on initial tags page. Invalid stream.\n");
510 if (
op.e_o_s && (
os.serialno == opus_serialno) )
518 ret = opus_decode_float (
dec,
519 (
const unsigned char *)
op.packet,
527 fprintf (stderr,
"Decoding error: %s\n", opus_strerror (
ret));
532 "Decoded %d bytes/channel (%d bytes) from %u compressed bytes\n",
535 (
unsigned int)
op.bytes);
543 "Applying gain %f\n",
552 maxout = ((page_granule - gran_offset) *
SAMPLING_RATE / 48000)
555 "Writing audio packet %" PRId64
", at most %" PRId64
557 packet_count, maxout);
568 opus_decoder_destroy (
dec);
599 data = ogg_sync_buffer (&
oy, payload_len);
602 ogg_sync_wrote (&
oy, payload_len);
629 "Unblocking main loop!\n");
649 _ (
"gnunet-helper-audio-playback - Got signal, exiting\n"));
665 switch (pa_context_get_state (c))
667 case PA_CONTEXT_CONNECTING:
668 case PA_CONTEXT_AUTHORIZING:
669 case PA_CONTEXT_SETTING_NAME:
672 case PA_CONTEXT_READY:
676 _ (
"Connection established.\n"));
678 pa_stream_new (c,
"GNUNET VoIP playback", &
sample_spec, NULL)))
681 _ (
"pa_stream_new() failed: %s\n"),
682 pa_strerror (pa_context_errno (c)));
691 PA_STREAM_ADJUST_LATENCY
692 | PA_STREAM_INTERPOLATE_TIMING
693 | PA_STREAM_AUTO_TIMING_UPDATE,
697 _ (
"pa_stream_connect_playback() failed: %s\n"),
698 pa_strerror (pa_context_errno (c)));
704 case PA_CONTEXT_TERMINATED:
708 case PA_CONTEXT_FAILED:
711 _ (
"Connection failure: %s\n"),
712 pa_strerror (pa_context_errno (c)));
735 if (! (
m = pa_threaded_mainloop_new ()))
738 _ (
"pa_mainloop_new() failed.\n"));
752 _ (
"pa_context_new() failed.\n"));
756 if (pa_context_connect (
context, NULL, 0, NULL) < 0)
759 _ (
"pa_context_connect() failed: %s\n"),
760 pa_strerror (pa_context_errno (
context)));
762 if (pa_threaded_mainloop_start (
m) < 0)
765 _ (
"pa_mainloop_run() failed.\n"));
783 pa_threaded_mainloop_signal (
m,
798 static unsigned long long toff;
804#ifdef DEBUG_READ_PURE_OGG
805 int read_pure_ogg =
getenv (
"GNUNET_READ_PURE_OGG") ? 1 : 0;
823 "Waiting for PulseAudio to be ready.\n");
829#ifdef DEBUG_DUMP_DECODED_OGG
834 ret = read (STDIN_FILENO,
839 "Received %d bytes of audio data (total: %llu)\n",
845 _ (
"Read error from STDIN: %s\n"),
851#ifdef DEBUG_READ_PURE_OGG
856 ogg_sync_wrote (&
oy,
ret);
870 pa_threaded_mainloop_lock (
m);
874 while (pa_operation_get_state (o) == PA_OPERATION_RUNNING)
878 pa_threaded_mainloop_wait (
m);
882 pa_operation_unref (o);
885 pa_threaded_mainloop_unlock (
m);
struct GNUNET_MessageHeader * msg
constants for network protocols
static struct GNUNET_ARM_Operation * op
Current operation.
static int ret
Final status code.
static char * data
The data to insert into the dht.
struct GNUNET_MessageStreamTokenizer * stdin_mst
Tokenizer for the data we get from stdin.
static int write_wav_header()
int main(int argc, char *argv[])
The main function for the playback helper.
static GNUNET_NETWORK_STRUCT_END OpusDecoder * process_header(ogg_packet *op)
Process an Opus header and setup the opus decoder based on it.
static void quit(int ret)
Pulseaudio shutdown task.
static pa_context * context
Pulseaudio context.
static pa_mainloop_api * mainloop_api
Pulseaudio mainloop api.
static void drain_callback(pa_stream *s, int success, void *userdata)
static int ready_pipe[2]
Pipe we use to signal the main loop that we are ready to receive.
static OpusDecoder * dec
OPUS decoder.
static void pa_init()
Pulseaudio initialization.
static void stream_write_callback(pa_stream *s, size_t length, void *userdata)
Callback when data is there for playback.
static float * pcm_buffer
PCM data buffer.
static int stdin_receiver(void *cls, const struct GNUNET_MessageHeader *msg)
Message callback.
static size_t fwrite_le32(opus_int32 i32, FILE *file)
static pa_threaded_mainloop * m
Pulseaudio threaded mainloop.
static void context_state_callback(pa_context *c, void *userdata)
Pulseaudio stream state callback.
static ogg_sync_state oy
Ogg I/O state.
static size_t fwrite_le16(int i16, FILE *file)
static int64_t audio_write(int64_t maxout)
static pa_sample_spec sample_spec
Pulseaudio specification.
static void ogg_demux_and_decode()
static int dump_to_stdout
static pa_stream * stream_out
Pulseaudio output stream.
static int frame_size
Number of samples for one frame.
static ogg_stream_state os
Ogg stream state.
static void exit_signal_callback(pa_mainloop_api *m, pa_signal_event *e, int sig, void *userdata)
Exit callback for SIGTERM and SIGINT.
static struct GNUNET_OS_Process * p
Helper process we started.
Core service; the main API for encrypted P2P communications.
Constants for network protocols.
#define GNUNET_NETWORK_STRUCT_BEGIN
Define as empty, GNUNET_PACKED should suffice, but this won't work on W32.
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
#define GNUNET_log(kind,...)
#define GNUNET_le32toh(x)
#define GNUNET_le16toh(x)
#define GNUNET_NETWORK_STRUCT_END
Define as empty, GNUNET_PACKED should suffice, but this won't work on W32;.
#define GNUNET_memcpy(dst, src, n)
Call memcpy() but check for n being 0 first.
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format.
#define GNUNET_PACKED
gcc-ism to get packed structs.
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
enum GNUNET_GenericReturnValue GNUNET_log_setup(const char *comp, const char *loglevel, const char *logfile)
Setup logging.
#define GNUNET_log_strerror(level, cmd)
Log an error message at log-level 'level' that indicates a failure of the command 'cmd' with the mess...
@ GNUNET_ERROR_TYPE_ERROR
@ GNUNET_ERROR_TYPE_DEBUG
#define GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO
Message to transmit the audio between helper and speaker/microphone library.
enum GNUNET_GenericReturnValue GNUNET_MST_from_buffer(struct GNUNET_MessageStreamTokenizer *mst, const char *buf, size_t size, int purge, int one_shot)
Add incoming data to the receive buffer and call the callback for all complete messages.
struct GNUNET_MessageStreamTokenizer * GNUNET_MST_create(GNUNET_MessageTokenizerCallback cb, void *cb_cls)
Create a message stream tokenizer.
void GNUNET_MST_destroy(struct GNUNET_MessageStreamTokenizer *mst)
Destroys a tokenizer.
Message to transmit the audio (between client and helpers).
struct GNUNET_MessageHeader header
Type is GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO.
Handle to a message stream tokenizer.