/pidgin/main: 3eab84b9b88f: libpurple: Add PurpleQueuedOutputStr...
Mike Ruprecht
cmaiku at gmail.com
Sun May 15 11:11:26 EDT 2016
Changeset: 3eab84b9b88f88b6ac44a39a81b9af6932b8c4d1
Author: Mike Ruprecht <cmaiku at gmail.com>
Date: 2016-04-29 02:03 -0500
Branch: default
URL: https://hg.pidgin.im/pidgin/main/rev/3eab84b9b88f
Description:
libpurple: Add PurpleQueuedOutputStream for fire and forget output
This patch adds a queued GOutputStream implementation to allow
queuing data to send even if a stream operation is in progress.
This could potentially be upstreamed to Gio with some
cleanup/reformatting.
diffstat:
libpurple/Makefile.am | 2 +
libpurple/queuedoutputstream.c | 151 +++++++++++++++++++++++++++++++++++++++++
libpurple/queuedoutputstream.h | 109 +++++++++++++++++++++++++++++
3 files changed, 262 insertions(+), 0 deletions(-)
diffs (289 lines):
diff --git a/libpurple/Makefile.am b/libpurple/Makefile.am
--- a/libpurple/Makefile.am
+++ b/libpurple/Makefile.am
@@ -93,6 +93,7 @@ purple_coresources = \
protocol.c \
protocols.c \
purple-socket.c \
+ queuedoutputstream.c \
request.c \
request-datasheet.c \
roomlist.c \
@@ -172,6 +173,7 @@ purple_coreheaders = \
protocol.h \
protocols.h \
purple-socket.h \
+ queuedoutputstream.h \
request.h \
request-datasheet.h \
roomlist.h \
diff --git a/libpurple/queuedoutputstream.c b/libpurple/queuedoutputstream.c
new file mode 100644
--- /dev/null
+++ b/libpurple/queuedoutputstream.c
@@ -0,0 +1,151 @@
+/*
+ *
+ * purple
+ *
+ * Purple is the legal property of its developers, whose names are too numerous
+ * to list here. Please refer to the COPYRIGHT file distributed with this
+ * source distribution.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
+ */
+
+#include "internal.h"
+#include "queuedoutputstream.h"
+
+struct _PurpleQueuedOutputStreamPrivate {
+ GAsyncQueue *queue;
+};
+
+GObjectClass *parent_class = NULL;
+
+#define PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(obj) \
+ (G_TYPE_INSTANCE_GET_PRIVATE((obj), \
+ PURPLE_TYPE_QUEUED_OUTPUT_STREAM, \
+ PurpleQueuedOutputStreamPrivate))
+
+G_DEFINE_TYPE_WITH_CODE(PurpleQueuedOutputStream, purple_queued_output_stream,
+ G_TYPE_FILTER_OUTPUT_STREAM,
+ G_ADD_PRIVATE(PurpleQueuedOutputStream))
+
+static void purple_queued_output_stream_dispose(GObject *object);
+static gboolean purple_queued_output_stream_flush(GOutputStream *stream,
+ GCancellable *cancellable, GError **error);
+
+static void
+purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass)
+{
+ GObjectClass *object_class;
+ GOutputStreamClass *ostream_class;
+
+ parent_class = g_type_class_peek_parent(klass);
+
+ object_class = G_OBJECT_CLASS(klass);
+ object_class->dispose = purple_queued_output_stream_dispose;
+
+ ostream_class = G_OUTPUT_STREAM_CLASS(klass);
+ ostream_class->flush = purple_queued_output_stream_flush;
+}
+
+PurpleQueuedOutputStream *
+purple_queued_output_stream_new(GOutputStream *base_stream)
+{
+ PurpleQueuedOutputStream *stream;
+
+ g_return_val_if_fail(G_IS_OUTPUT_STREAM(base_stream), NULL);
+
+ stream = g_object_new(PURPLE_TYPE_QUEUED_OUTPUT_STREAM,
+ "base-stream", base_stream,
+ NULL);
+
+ return stream;
+}
+
+void
+purple_queued_output_stream_push_bytes(PurpleQueuedOutputStream *stream,
+ GBytes *bytes)
+{
+ g_return_if_fail(PURPLE_QUEUED_OUTPUT_STREAM(stream));
+ g_return_if_fail(bytes != NULL);
+
+ g_async_queue_push(stream->priv->queue, g_bytes_ref(bytes));
+}
+
+static void
+purple_queued_output_stream_init(PurpleQueuedOutputStream *stream)
+{
+ stream->priv = PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(stream);
+ stream->priv->queue =
+ g_async_queue_new_full((GDestroyNotify)g_bytes_unref);
+}
+
+static void
+purple_queued_output_stream_dispose(GObject *object)
+{
+ PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(object);
+
+ /* Chain up first in case the stream is flushed */
+ G_OBJECT_CLASS(parent_class)->dispose(object);
+
+ g_clear_pointer(&stream->priv->queue, g_async_queue_unref);
+}
+
+static gboolean
+purple_queued_output_stream_flush(GOutputStream *stream,
+ GCancellable *cancellable, GError **error)
+{
+ GOutputStream *base_stream;
+ GAsyncQueue *queue;
+ GBytes *bytes;
+ const void *buffer;
+ gsize count;
+ gsize bytes_written = 0;
+ gboolean ret = TRUE;
+
+ base_stream = g_filter_output_stream_get_base_stream(
+ G_FILTER_OUTPUT_STREAM(stream));
+ queue = PURPLE_QUEUED_OUTPUT_STREAM(stream)->priv->queue;
+
+ do {
+ bytes = g_async_queue_try_pop(queue);
+
+ if (bytes == NULL) {
+ break;
+ }
+
+ buffer = g_bytes_get_data(bytes, &count);
+
+ ret = g_output_stream_write_all(base_stream, buffer, count,
+ &bytes_written, cancellable, error);
+
+ if (!ret) {
+ GBytes *queue_bytes;
+
+ if (bytes_written > 0) {
+ queue_bytes = g_bytes_new_from_bytes(bytes,
+ bytes_written,
+ count - bytes_written);
+ } else {
+ queue_bytes = g_bytes_ref(bytes);
+ }
+
+ g_async_queue_push_front(queue, queue_bytes);
+ }
+
+ g_bytes_unref(bytes);
+ } while (ret);
+
+ return ret;
+}
+
diff --git a/libpurple/queuedoutputstream.h b/libpurple/queuedoutputstream.h
new file mode 100644
--- /dev/null
+++ b/libpurple/queuedoutputstream.h
@@ -0,0 +1,109 @@
+/*
+ *
+ * purple
+ *
+ * Purple is the legal property of its developers, whose names are too numerous
+ * to list here. Please refer to the COPYRIGHT file distributed with this
+ * source distribution.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
+ */
+
+#ifndef _PURPLE_QUEUED_OUTPUT_STREAM_H
+#define _PURPLE_QUEUED_OUTPUT_STREAM_H
+/**
+ * SECTION:queued-output-stream
+ * @section_id: libpurple-queued-output-stream
+ * @short_description: GOutputStream for queuing data to output
+ * @title: GOutputStream class
+ *
+ * A #GQueuedOutputStream is a #GOutputStream which allows data to be queued
+ * for outputting. It differs from a #GBufferedOutputStream in that it allows
+ * for data to be queued while other operations are in progress.
+ */
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+#define PURPLE_TYPE_QUEUED_OUTPUT_STREAM (purple_queued_output_stream_get_type())
+#define PURPLE_QUEUED_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST((o), PURPLE_TYPE_QUEUED_OUTPUT_STREAM, PurpleQueuedOutputStream))
+#define PURPLE_QUEUED_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), PURPLE_TYPE_QUEUED_OUTPUT_STREAM, GQueuedOutputStreamClass))
+#define PURPLE_IS_QUEUED_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE((o), PURPLE_TYPE_QUEUED_OUTPUT_STREAM)
+#define PURPLE_IS_QUEUED_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE((k), PURPLE_TYPE_QUEUED_OUTPUT_STREAM))
+#define PURPLE_IS_QUEUED_OUTPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS((o), PURPLE_TYPE_UEUED_OUTPUT_STREAM, GQueuedOutputStreamClass))
+
+/**
+ * PurpleQueuedOutputStream:
+ *
+ * An implementation of #GFilterOutputStream which allows queuing data for
+ * output. This allows data to be queued while other data is being output.
+ * Therefore, data doesn't have to be manually stored while waiting for
+ * stream operations to finish.
+ *
+ * To create a queued output stream, use #purple_queued_output_stream_new().
+ *
+ * To queue data, use #purple_queued_output_stream_push_bytes().
+ *
+ * Once data has been queued, flush the stream with #g_output_stream_flush()
+ * or #g_output_stream_flush_async().
+ */
+typedef struct _PurpleQueuedOutputStream PurpleQueuedOutputStream;
+typedef struct _PurpleQueuedOutputStreamClass PurpleQueuedOutputStreamClass;
+typedef struct _PurpleQueuedOutputStreamPrivate PurpleQueuedOutputStreamPrivate;
+
+struct _PurpleQueuedOutputStream
+{
+ GFilterOutputStream parent_instance;
+
+ /*< protected >*/
+ PurpleQueuedOutputStreamPrivate *priv;
+};
+
+struct _PurpleQueuedOutputStreamClass
+{
+ GFilterOutputStreamClass parent_class;
+
+ /*< private >*/
+ /* Padding for future expansion */
+ void (*_g_reserved1)(void);
+ void (*_g_reserved2)(void);
+};
+
+GType purple_queued_output_stream_get_type(void) G_GNUC_CONST;
+
+/*
+ * purple_queued_output_stream_new
+ * @base_stream: Base output stream to wrap with the queued stream
+ *
+ * Creates a new queued output stream for a base stream.
+ */
+PurpleQueuedOutputStream *purple_queued_output_stream_new(
+ GOutputStream *base_stream);
+
+/*
+ * purple_queued_output_stream_push_bytes
+ * @stream: Stream to push bytes to
+ * @bytes: Bytes to queue
+ *
+ * Queues data to be output through the stream. Flush the stream to
+ * output this data.
+ */
+void purple_queued_output_stream_push_bytes(PurpleQueuedOutputStream *stream,
+ GBytes *bytes);
+
+G_END_DECLS
+
+#endif /* _PURPLE_QUEUED_OUTPUT_STREAM_H */
More information about the Commits
mailing list