GNUnet  0.10.x
gnunet-helper-audio-playback-gst.c
Go to the documentation of this file.
1 /*
2  This file is part of GNUnet.
3  Copyright (C) 2013 GNUnet e.V.
4 
5  GNUnet is free software: you can redistribute it and/or modify it
6  under the terms of the GNU Affero General Public License as published
7  by the Free Software Foundation, either version 3 of the License,
8  or (at your option) any later version.
9 
10  GNUnet is distributed in the hope that it will be useful, but
11  WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Affero General Public License for more details.
14 
15  You should have received a copy of the GNU Affero General Public License
16  along with this program. If not, see <http://www.gnu.org/licenses/>.
17 
18  SPDX-License-Identifier: AGPL3.0-or-later
19  */
25 #include "platform.h"
26 #include "gnunet_util_lib.h"
27 #include "gnunet_protocols.h"
28 #include "conversation.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_core_service.h"
31 
32 #include <gst/gst.h>
33 #include <gst/audio/gstaudiobasesrc.h>
34 #include <gst/app/gstappsrc.h>
35 #include <glib.h>
36 
37 #define DEBUG_READ_PURE_OGG 1
38 
42 #define MAXLINE 4096
43 
48 #define BUFFER_TIME 1000
49 
54 #define LATENCY_TIME 1000
55 
60 
64 static GstElement *pipeline;
65 
69 static GstElement *source;
70 
71 static GstElement *demuxer;
72 static GstElement *decoder;
73 static GstElement *conv;
74 static GstElement *resampler;
75 static GstElement *sink;
76 
80 static int abort_read;
81 
82 
83 static void
84 sink_child_added(GstChildProxy *child_proxy,
85  GObject *object,
86  gchar *name,
87  gpointer user_data)
88 {
89  if (GST_IS_AUDIO_BASE_SRC(object))
90  g_object_set(object,
91  "buffer-time", (gint64)BUFFER_TIME,
92  "latency-time", (gint64)LATENCY_TIME,
93  NULL);
94 }
95 
96 
97 static void
98 ogg_pad_added(GstElement *element,
99  GstPad *pad,
100  gpointer data)
101 {
102  GstPad *sinkpad;
103  GstElement *decoder = (GstElement *)data;
104 
105  /* We can now link this pad with the opus-decoder sink pad */
106  sinkpad = gst_element_get_static_pad(decoder, "sink");
107 
108  gst_pad_link(pad, sinkpad);
109 
110  gst_element_link_many(decoder, conv, resampler, sink, NULL);
111 
112  gst_object_unref(sinkpad);
113 }
114 
115 
116 static void
118 {
119  if (NULL != source)
120  gst_app_src_end_of_stream(GST_APP_SRC(source));
121  if (NULL != pipeline)
122  gst_element_set_state(pipeline, GST_STATE_NULL);
123  abort_read = 1;
124 }
125 
126 
127 static gboolean
128 bus_call(GstBus *bus, GstMessage *msg, gpointer data)
129 {
131  "Bus message\n");
132  switch (GST_MESSAGE_TYPE(msg))
133  {
134  case GST_MESSAGE_EOS:
136  "End of stream\n");
137  quit();
138  break;
139 
140  case GST_MESSAGE_ERROR:
141  {
142  gchar *debug;
143  GError *error;
144 
145  gst_message_parse_error(msg, &error, &debug);
146  g_free(debug);
147 
149  "Error: %s\n",
150  error->message);
151  g_error_free(error);
152 
153  quit();
154  break;
155  }
156 
157  default:
158  break;
159  }
160 
161  return TRUE;
162 }
163 
164 
165 static void
167 {
168  quit();
169 }
170 
171 
172 static int
173 feed_buffer_to_gst(const char *audio, size_t b_len)
174 {
175  GstBuffer *b;
176  gchar *bufspace;
177  GstFlowReturn flow;
178 
180  "Feeding %u bytes to GStreamer\n",
181  (unsigned int)b_len);
182 
183  bufspace = g_memdup(audio, b_len);
184  b = gst_buffer_new_wrapped(bufspace, b_len);
185  if (NULL == b)
186  {
188  "Failed to wrap a buffer\n");
189  g_free(bufspace);
190  return GNUNET_SYSERR;
191  }
192  flow = gst_app_src_push_buffer(GST_APP_SRC(source), b);
193  /* They all return GNUNET_OK, because currently player stops when
194  * data stops coming. This might need to be changed for the player
195  * to also stop when pipeline breaks.
196  */
197  switch (flow)
198  {
199  case GST_FLOW_OK:
201  "Fed %u bytes to the pipeline\n",
202  (unsigned int)b_len);
203  break;
204 
205  case GST_FLOW_FLUSHING:
206  /* buffer was dropped, because pipeline state is not PAUSED or PLAYING */
208  "Dropped a buffer\n");
209  break;
210 
211  case GST_FLOW_EOS:
212  /* end of stream */
214  "EOS\n");
215  break;
216 
217  default:
219  "Unexpected push result\n");
220  break;
221  }
222  return GNUNET_OK;
223 }
224 
225 
234 static int
235 stdin_receiver(void *cls,
236  const struct GNUNET_MessageHeader *msg)
237 {
238  struct AudioMessage *audio;
239  size_t b_len;
240 
241  switch (ntohs(msg->type))
242  {
244  audio = (struct AudioMessage *)msg;
245 
246  b_len = ntohs(audio->header.size) - sizeof(struct AudioMessage);
247  feed_buffer_to_gst((const char *)&audio[1], b_len);
248  break;
249 
250  default:
251  break;
252  }
253  return GNUNET_OK;
254 }
255 
256 
257 int
258 main(int argc, char **argv)
259 {
260  GstBus *bus;
261  guint bus_watch_id;
262  uint64_t toff;
263 
264  typedef void (*SignalHandlerPointer) (int);
265 
266  SignalHandlerPointer inthandler, termhandler;
267 #ifdef DEBUG_READ_PURE_OGG
268  int read_pure_ogg = getenv("GNUNET_READ_PURE_OGG") ? 1 : 0;
269 #endif
270 
271  inthandler = signal(SIGINT,
272  &signalhandler);
273  termhandler = signal(SIGTERM,
274  &signalhandler);
275 
276  /* Initialisation */
277  gst_init(&argc, &argv);
278 
280  GNUNET_log_setup("gnunet-helper-audio-playback-gst",
281  "WARNING",
282  NULL));
283 
285  "Audio sink starts\n");
286 
287  stdin_mst = GNUNET_MST_create(&stdin_receiver,
288  NULL);
289 
290  /* Create gstreamer elements */
291  pipeline = gst_pipeline_new("audio-player");
292  source = gst_element_factory_make("appsrc", "audio-input");
293  demuxer = gst_element_factory_make("oggdemux", "ogg-demuxer");
294  decoder = gst_element_factory_make("opusdec", "opus-decoder");
295  conv = gst_element_factory_make("audioconvert", "converter");
296  resampler = gst_element_factory_make("audioresample", "resampler");
297  sink = gst_element_factory_make("autoaudiosink", "audiosink");
298 
299  if (!pipeline || !source || !conv || !resampler || !decoder || !demuxer || !sink)
300  {
302  "One element could not be created. Exiting.\n");
303  return -1;
304  }
305 
306  g_signal_connect(sink,
307  "child-added",
308  G_CALLBACK(sink_child_added),
309  NULL);
310  g_signal_connect(demuxer,
311  "pad-added",
312  G_CALLBACK(ogg_pad_added),
313  decoder);
314 
315  /* Keep a reference to it, we operate on it */
316  gst_object_ref(GST_OBJECT(source));
317 
318  /* Set up the pipeline */
319 
320  /* we feed appsrc as fast as possible, it just blocks when it's full */
321  g_object_set(G_OBJECT(source),
322 /* "format", GST_FORMAT_TIME,*/
323  "block", TRUE,
324  "is-live", TRUE,
325  NULL);
326 
327  g_object_set(G_OBJECT(decoder),
328 /* "plc", FALSE,*/
329 /* "apply-gain", TRUE,*/
330  "use-inband-fec", TRUE,
331  NULL);
332 
333  /* we add a message handler */
334  bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
335  bus_watch_id = gst_bus_add_watch(bus, bus_call, pipeline);
336  gst_object_unref(bus);
337 
338  /* we add all elements into the pipeline */
339  /* audio-input | ogg-demuxer | opus-decoder | converter | resampler | audiosink */
340  gst_bin_add_many(GST_BIN(pipeline), source, demuxer, decoder, conv,
341  resampler, sink, NULL);
342 
343  /* we link the elements together */
344  gst_element_link_many(source, demuxer, NULL);
345 
346  /* Set the pipeline to "playing" state*/
347  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Now playing\n");
348  gst_element_set_state(pipeline, GST_STATE_PLAYING);
349 
350  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Running...\n");
351  /* Iterate */
352  toff = 0;
353  while (!abort_read)
354  {
355  char readbuf[MAXLINE];
356  int ret;
357 
358  ret = read(0, readbuf, sizeof(readbuf));
359  if (0 > ret)
360  {
362  _("Read error from STDIN: %d %s\n"),
363  ret, strerror(errno));
364  break;
365  }
366  toff += ret;
368  "Received %d bytes of audio data (total: %llu)\n",
369  (int)ret,
370  (unsigned long long)toff);
371  if (0 == ret)
372  break;
373 #ifdef DEBUG_READ_PURE_OGG
374  if (read_pure_ogg)
375  {
376  feed_buffer_to_gst(readbuf, ret);
377  }
378  else
379 #endif
380  GNUNET_MST_from_buffer(stdin_mst,
381  readbuf,
382  ret,
383  GNUNET_NO,
384  GNUNET_NO);
385  }
386  GNUNET_MST_destroy(stdin_mst);
387 
388  signal(SIGINT, inthandler);
389  signal(SIGINT, termhandler);
390 
392  "Returned, stopping playback\n");
393  quit();
394 
396  "Deleting pipeline\n");
397  gst_object_unref(GST_OBJECT(source));
398  source = NULL;
399  gst_object_unref(GST_OBJECT(pipeline));
400  pipeline = NULL;
401  g_source_remove(bus_watch_id);
402 
403  return 0;
404 }
static GstElement * resampler
struct GNUNET_MessageHeader * msg
Definition: 005.c:2
struct GNUNET_MessageHeader header
Type is GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO.
Definition: conversation.h:59
static void ogg_pad_added(GstElement *element, GstPad *pad, gpointer data)
static GstElement * conv
constants for network protocols
#define GNUNET_assert(cond)
Use this for fatal errors that cannot be handled.
static GstElement * pipeline
Main pipeline.
struct GNUNET_MessageStreamTokenizer * stdin_mst
Tokenizer for the data we get from stdin.
#define GNUNET_NO
Definition: gnunet_common.h:78
#define GNUNET_OK
Named constants for return values.
Definition: gnunet_common.h:75
uint16_t size
The length of the struct (in bytes, including the length field itself), in big-endian format...
static int ret
Final status code.
Definition: gnunet-arm.c:89
#define _(String)
GNU gettext support macro.
Definition: platform.h:181
uint16_t type
The type of the message (GNUNET_MESSAGE_TYPE_XXXX), in big-endian format.
static GstElement * sink
#define MAXLINE
How much data to read in one go.
void GNUNET_MST_destroy(struct GNUNET_MessageStreamTokenizer *mst)
Destroys a tokenizer.
Definition: mst.c:410
Message to transmit the audio (between client and helpers).
Definition: conversation.h:55
Handle to a message stream tokenizer.
Definition: mst.c:43
struct GNUNET_MessageStreamTokenizer * GNUNET_MST_create(GNUNET_MessageTokenizerCallback cb, void *cb_cls)
Create a message stream tokenizer.
Definition: mst.c:84
int GNUNET_MST_from_buffer(struct GNUNET_MessageStreamTokenizer *mst, const char *buf, size_t size, int purge, int one_shot)
Add incoming data to the receive buffer and call the callback for all complete messages.
Definition: mst.c:113
int main(int argc, char **argv)
#define gst_element_factory_make(element, name)
Definition: gnunet_gst.h:36
static void signalhandler(int s)
static void quit()
static GstElement * source
Appsrc instance into which we write data for the pipeline.
#define GNUNET_SYSERR
Definition: gnunet_common.h:76
static gboolean bus_call(GstBus *bus, GstMessage *msg, gpointer data)
static GstElement * demuxer
const char * name
char * getenv()
#define LATENCY_TIME
Min number of microseconds to buffer in audiosink.
static int abort_read
Set to 1 to break the reading loop.
static void sink_child_added(GstChildProxy *child_proxy, GObject *object, gchar *name, gpointer user_data)
static int feed_buffer_to_gst(const char *audio, size_t b_len)
#define GNUNET_log(kind,...)
#define GNUNET_MESSAGE_TYPE_CONVERSATION_AUDIO
Message to transmit the audio between helper and speaker/microphone library.
Header for all communications.
static int stdin_receiver(void *cls, const struct GNUNET_MessageHeader *msg)
Message callback.
int GNUNET_log_setup(const char *comp, const char *loglevel, const char *logfile)
Setup logging.
static GstElement * decoder
uint32_t data
The data value.
#define BUFFER_TIME
Max number of microseconds to buffer in audiosink.