/pidgin/main: 3eab84b9b88f: libpurple: Add PurpleQueuedOutputStr...

Mike Ruprecht cmaiku at gmail.com
Sun May 15 11:11:26 EDT 2016


Changeset: 3eab84b9b88f88b6ac44a39a81b9af6932b8c4d1
Author:	 Mike Ruprecht <cmaiku at gmail.com>
Date:	 2016-04-29 02:03 -0500
Branch:	 default
URL: https://hg.pidgin.im/pidgin/main/rev/3eab84b9b88f

Description:

libpurple: Add PurpleQueuedOutputStream for fire and forget output

This patch adds a queued GOutputStream implementation to allow
queuing data to send even if a stream operation is in progress.

This could potentially be upstreamed to Gio with some
cleanup/reformatting.

diffstat:

 libpurple/Makefile.am          |    2 +
 libpurple/queuedoutputstream.c |  151 +++++++++++++++++++++++++++++++++++++++++
 libpurple/queuedoutputstream.h |  109 +++++++++++++++++++++++++++++
 3 files changed, 262 insertions(+), 0 deletions(-)

diffs (289 lines):

diff --git a/libpurple/Makefile.am b/libpurple/Makefile.am
--- a/libpurple/Makefile.am
+++ b/libpurple/Makefile.am
@@ -93,6 +93,7 @@ purple_coresources = \
 	protocol.c \
 	protocols.c \
 	purple-socket.c \
+	queuedoutputstream.c \
 	request.c \
 	request-datasheet.c \
 	roomlist.c \
@@ -172,6 +173,7 @@ purple_coreheaders = \
 	protocol.h \
 	protocols.h \
 	purple-socket.h \
+	queuedoutputstream.h \
 	request.h \
 	request-datasheet.h \
 	roomlist.h \
diff --git a/libpurple/queuedoutputstream.c b/libpurple/queuedoutputstream.c
new file mode 100644
--- /dev/null
+++ b/libpurple/queuedoutputstream.c
@@ -0,0 +1,151 @@
+/*
+ *
+ * purple
+ *
+ * Purple is the legal property of its developers, whose names are too numerous
+ * to list here.  Please refer to the COPYRIGHT file distributed with this
+ * source distribution.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02111-1301  USA
+ */
+
+#include "internal.h"
+#include "queuedoutputstream.h"
+
+struct _PurpleQueuedOutputStreamPrivate {
+	GAsyncQueue *queue;
+};
+
+GObjectClass *parent_class = NULL;
+
+#define PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(obj) \
+		(G_TYPE_INSTANCE_GET_PRIVATE((obj), \
+		PURPLE_TYPE_QUEUED_OUTPUT_STREAM, \
+		PurpleQueuedOutputStreamPrivate))
+
+G_DEFINE_TYPE_WITH_CODE(PurpleQueuedOutputStream, purple_queued_output_stream,
+		G_TYPE_FILTER_OUTPUT_STREAM,
+		G_ADD_PRIVATE(PurpleQueuedOutputStream))
+
+static void purple_queued_output_stream_dispose(GObject *object);
+static gboolean purple_queued_output_stream_flush(GOutputStream *stream,
+		GCancellable *cancellable, GError **error);
+
+static void
+purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass)
+{
+	GObjectClass *object_class;
+	GOutputStreamClass *ostream_class;
+
+	parent_class = g_type_class_peek_parent(klass);
+
+	object_class = G_OBJECT_CLASS(klass);
+	object_class->dispose = purple_queued_output_stream_dispose;
+
+	ostream_class = G_OUTPUT_STREAM_CLASS(klass);
+	ostream_class->flush = purple_queued_output_stream_flush;
+}
+
+PurpleQueuedOutputStream *
+purple_queued_output_stream_new(GOutputStream *base_stream)
+{
+	PurpleQueuedOutputStream *stream;
+
+	g_return_val_if_fail(G_IS_OUTPUT_STREAM(base_stream), NULL);
+
+	stream = g_object_new(PURPLE_TYPE_QUEUED_OUTPUT_STREAM,
+			"base-stream", base_stream,
+			NULL);
+
+	return stream;
+}
+
+void
+purple_queued_output_stream_push_bytes(PurpleQueuedOutputStream *stream,
+		GBytes *bytes)
+{
+	g_return_if_fail(PURPLE_QUEUED_OUTPUT_STREAM(stream));
+	g_return_if_fail(bytes != NULL);
+
+	g_async_queue_push(stream->priv->queue, g_bytes_ref(bytes));
+}
+
+static void
+purple_queued_output_stream_init(PurpleQueuedOutputStream *stream)
+{
+	stream->priv = PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(stream);
+	stream->priv->queue =
+			g_async_queue_new_full((GDestroyNotify)g_bytes_unref);
+}
+
+static void
+purple_queued_output_stream_dispose(GObject *object)
+{
+	PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(object);
+
+	/* Chain up first in case the stream is flushed */
+	G_OBJECT_CLASS(parent_class)->dispose(object);
+
+	g_clear_pointer(&stream->priv->queue, g_async_queue_unref);
+}
+
+static gboolean
+purple_queued_output_stream_flush(GOutputStream *stream,
+		GCancellable *cancellable, GError **error)
+{
+	GOutputStream *base_stream;
+	GAsyncQueue *queue;
+	GBytes *bytes;
+	const void *buffer;
+	gsize count;
+	gsize bytes_written = 0;
+	gboolean ret = TRUE;
+
+	base_stream = g_filter_output_stream_get_base_stream(
+			G_FILTER_OUTPUT_STREAM(stream));
+	queue = PURPLE_QUEUED_OUTPUT_STREAM(stream)->priv->queue;
+
+	do {
+		bytes = g_async_queue_try_pop(queue);
+
+		if (bytes == NULL) {
+			break;
+		}
+
+		buffer = g_bytes_get_data(bytes, &count);
+
+		ret = g_output_stream_write_all(base_stream, buffer, count,
+				&bytes_written, cancellable, error);
+
+		if (!ret) {
+			GBytes *queue_bytes;
+
+			if (bytes_written > 0) {
+				queue_bytes = g_bytes_new_from_bytes(bytes,
+						bytes_written,
+						count - bytes_written);
+			} else {
+				queue_bytes = g_bytes_ref(bytes);
+			}
+
+			g_async_queue_push_front(queue, queue_bytes);
+		}
+
+		g_bytes_unref(bytes);
+	} while (ret);
+
+	return ret;
+}
+
diff --git a/libpurple/queuedoutputstream.h b/libpurple/queuedoutputstream.h
new file mode 100644
--- /dev/null
+++ b/libpurple/queuedoutputstream.h
@@ -0,0 +1,109 @@
+/*
+ *
+ * purple
+ *
+ * Purple is the legal property of its developers, whose names are too numerous
+ * to list here.  Please refer to the COPYRIGHT file distributed with this
+ * source distribution.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02111-1301  USA
+ */
+
+#ifndef _PURPLE_QUEUED_OUTPUT_STREAM_H
+#define _PURPLE_QUEUED_OUTPUT_STREAM_H
+/**
+ * SECTION:queued-output-stream
+ * @section_id: libpurple-queued-output-stream
+ * @short_description: GOutputStream for queuing data to output
+ * @title: GOutputStream class
+ *
+ * A #GQueuedOutputStream is a #GOutputStream which allows data to be queued
+ * for outputting. It differs from a #GBufferedOutputStream in that it allows
+ * for data to be queued while other operations are in progress.
+ */
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+#define PURPLE_TYPE_QUEUED_OUTPUT_STREAM		(purple_queued_output_stream_get_type())
+#define PURPLE_QUEUED_OUTPUT_STREAM(o)			(G_TYPE_CHECK_INSTANCE_CAST((o), PURPLE_TYPE_QUEUED_OUTPUT_STREAM, PurpleQueuedOutputStream))
+#define PURPLE_QUEUED_OUTPUT_STREAM_CLASS(k)		(G_TYPE_CHECK_CLASS_CAST((k), PURPLE_TYPE_QUEUED_OUTPUT_STREAM, GQueuedOutputStreamClass))
+#define PURPLE_IS_QUEUED_OUTPUT_STREAM(o)		(G_TYPE_CHECK_INSTANCE_TYPE((o), PURPLE_TYPE_QUEUED_OUTPUT_STREAM)
+#define PURPLE_IS_QUEUED_OUTPUT_STREAM_CLASS(k)		(G_TYPE_CHECK_CLASS_TYPE((k), PURPLE_TYPE_QUEUED_OUTPUT_STREAM))
+#define PURPLE_IS_QUEUED_OUTPUT_STREAM_GET_CLASS(o)	(G_TYPE_INSTANCE_GET_CLASS((o), PURPLE_TYPE_UEUED_OUTPUT_STREAM, GQueuedOutputStreamClass))
+
+/**
+ * PurpleQueuedOutputStream:
+ *
+ * An implementation of #GFilterOutputStream which allows queuing data for
+ * output. This allows data to be queued while other data is being output.
+ * Therefore, data doesn't have to be manually stored while waiting for
+ * stream operations to finish.
+ *
+ * To create a queued output stream, use #purple_queued_output_stream_new().
+ *
+ * To queue data, use #purple_queued_output_stream_push_bytes().
+ *
+ * Once data has been queued, flush the stream with #g_output_stream_flush()
+ * or #g_output_stream_flush_async().
+ */
+typedef struct _PurpleQueuedOutputStream	PurpleQueuedOutputStream;
+typedef struct _PurpleQueuedOutputStreamClass	PurpleQueuedOutputStreamClass;
+typedef struct _PurpleQueuedOutputStreamPrivate	PurpleQueuedOutputStreamPrivate;
+
+struct _PurpleQueuedOutputStream
+{
+	GFilterOutputStream parent_instance;
+
+	/*< protected >*/
+	PurpleQueuedOutputStreamPrivate *priv;
+};
+
+struct _PurpleQueuedOutputStreamClass
+{
+	GFilterOutputStreamClass parent_class;
+
+	/*< private >*/
+	/* Padding for future expansion */
+	void (*_g_reserved1)(void);
+	void (*_g_reserved2)(void);
+};
+
+GType purple_queued_output_stream_get_type(void) G_GNUC_CONST;
+
+/*
+ * purple_queued_output_stream_new
+ * @base_stream: Base output stream to wrap with the queued stream
+ *
+ * Creates a new queued output stream for a base stream.
+ */
+PurpleQueuedOutputStream *purple_queued_output_stream_new(
+		GOutputStream *base_stream);
+
+/*
+ * purple_queued_output_stream_push_bytes
+ * @stream: Stream to push bytes to
+ * @bytes: Bytes to queue
+ *
+ * Queues data to be output through the stream. Flush the stream to
+ * output this data.
+ */
+void purple_queued_output_stream_push_bytes(PurpleQueuedOutputStream *stream,
+		GBytes *bytes);
+
+G_END_DECLS
+
+#endif /* _PURPLE_QUEUED_OUTPUT_STREAM_H */



More information about the Commits mailing list