From ab60931620066f6704be3010903f779b3cb9c71a Mon Sep 17 00:00:00 2001 From: Will Thompson Date: Sun, 9 Apr 2017 10:21:56 +0100 Subject: Initial implementation of streaming API This is missing any error handling and rate-limiting around the stream itself, but once the stream is established, the full range of events are supported. Fixes issue #14. --- mastodon/Mastodon.py | 48 ++++++++++++ mastodon/__init__.py | 4 +- mastodon/streaming.py | 107 ++++++++++++++++++++++++++ setup.py | 2 +- tests/test_streaming.py | 195 ++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 354 insertions(+), 2 deletions(-) create mode 100644 mastodon/streaming.py create mode 100644 tests/test_streaming.py diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 9967bdb..493fb40 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -12,6 +12,9 @@ import datetime import dateutil import dateutil.parser +from contextlib import closing + + class Mastodon: """ Super basic but thorough and easy to use mastodon.social @@ -578,6 +581,37 @@ class Mastodon: media_file_description = (file_name, media_file, mime_type) return self.__api_request('POST', '/api/v1/media', files = {'file': media_file_description}) + def user_stream(self, listener): + """ + Streams events that are relevant to the authorized user, i.e. home + timeline and notifications. 'listener' should be a subclass of + StreamListener. + + This method blocks forever, calling callbacks on 'listener' for + incoming events. + """ + return self.__stream('/api/v1/streaming/user', listener) + + def public_stream(self, listener): + """ + Streams public events. 'listener' should be a subclass of + StreamListener. + + This method blocks forever, calling callbacks on 'listener' for + incoming events. + """ + return self.__stream('/api/v1/streaming/public', listener) + + def hashtag_stream(self, tag, listener): + """ + Returns all public statuses for the hashtag 'tag'. 'listener' should be + a subclass of StreamListener. + + This method blocks forever, calling callbacks on 'listener' for + incoming events. + """ + return self.__stream('/api/v1/streaming/hashtag', listener, params={'tag': tag}) + ### # Internal helpers, dragons probably ### @@ -710,6 +744,20 @@ class Mastodon: return response + def __stream(self, endpoint, listener, params = {}): + """ + Internal streaming API helper. + """ + + headers = {} + if self.access_token != None: + headers = {'Authorization': 'Bearer ' + self.access_token} + + url = self.api_base_url + endpoint + with closing(requests.get(url, headers = headers, data = params, stream = True)) as r: + listener.handle_stream(r.iter_lines()) + + def __generate_params(self, params, exclude = []): """ Internal named-parameters-to-dict helper. diff --git a/mastodon/__init__.py b/mastodon/__init__.py index 17f63e6..9c8e39b 100644 --- a/mastodon/__init__.py +++ b/mastodon/__init__.py @@ -1,2 +1,4 @@ from mastodon.Mastodon import Mastodon -__all__ = ['Mastodon'] +from mastodon.streaming import StreamListener, MalformedEventError + +__all__ = ['Mastodon', 'StreamListener', 'MalformedEventError'] diff --git a/mastodon/streaming.py b/mastodon/streaming.py new file mode 100644 index 0000000..3212848 --- /dev/null +++ b/mastodon/streaming.py @@ -0,0 +1,107 @@ +''' +Handlers for the Streaming API: +https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-API.md +''' + +import json +import logging +import six + + +log = logging.getLogger(__name__) + + +class MalformedEventError(Exception): + '''Raised when the server-sent event stream is malformed.''' + pass + + +class StreamListener(object): + '''Callbacks for the streaming API. Create a subclass, override the on_xxx + methods for the kinds of events you're interested in, then pass an instance + of your subclass to Mastodon.user_stream(), Mastodon.public_stream(), or + Mastodon.hashtag_stream().''' + + def on_update(self, status): + '''A new status has appeared! 'status' is the parsed JSON dictionary + describing the status.''' + pass + + def on_notification(self, notification): + '''A new notification. 'notification' is the parsed JSON dictionary + describing the notification.''' + pass + + def on_delete(self, status_id): + '''A status has been deleted. status_id is the status' integer ID.''' + pass + + def handle_heartbeat(self): + '''The server has sent us a keep-alive message. This callback may be + useful to carry out periodic housekeeping tasks, or just to confirm + that the connection is still open.''' + + def handle_stream(self, lines): + ''' + Handles a stream of events from the Mastodon server. When each event + is received, the corresponding .on_[name]() method is called. + + lines: an iterable of lines of bytes sent by the Mastodon server, as + returned by requests.Response.iter_lines(). + ''' + event = {} + for raw_line in lines: + try: + line = raw_line.decode('utf-8') + except UnicodeDecodeError as err: + six.raise_from( + MalformedEventError("Malformed UTF-8", line), + err + ) + + if line.startswith(':'): + self.handle_heartbeat() + elif line == '': + # end of event + self._despatch(event) + event = {} + else: + key, value = line.split(': ', 1) + # According to the MDN spec, repeating the 'data' key + # represents a newline(!) + if key in event: + event[key] += '\n' + value + else: + event[key] = value + + # end of stream + if event: + log.warn("outstanding partial event at end of stream: %s", event) + + def _despatch(self, event): + try: + name = event['event'] + data = event['data'] + payload = json.loads(data) + except KeyError as err: + six.raise_from( + MalformedEventError('Missing field', err.args[0], event), + err + ) + except ValueError as err: + # py2: plain ValueError + # py3: json.JSONDecodeError, a subclass of ValueError + six.raise_from( + MalformedEventError('Bad JSON', data), + err + ) + + handler_name = 'on_' + name + try: + handler = getattr(self, handler_name) + except AttributeError: + log.warn("Unhandled event '%s'", name) + else: + # TODO: allow handlers to return/raise to stop streaming cleanly + handler(payload) + diff --git a/setup.py b/setup.py index 59d684b..7f1401b 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup(name='Mastodon.py', packages=['mastodon'], setup_requires=['pytest-runner'], tests_require=['pytest'], - install_requires=['requests', 'dateutils'], + install_requires=['requests', 'dateutils', 'six'], url='https://github.com/halcy/Mastodon.py', author='Lorenz Diener', author_email='lorenzd+mastodonpypypi@gmail.com', diff --git a/tests/test_streaming.py b/tests/test_streaming.py new file mode 100644 index 0000000..c79a8e9 --- /dev/null +++ b/tests/test_streaming.py @@ -0,0 +1,195 @@ +import six +import pytest +import itertools +from mastodon.streaming import StreamListener, MalformedEventError + + +class Listener(StreamListener): + def __init__(self): + self.updates = [] + self.notifications = [] + self.deletes = [] + self.heartbeats = 0 + + def on_update(self, status): + self.updates.append(status) + + def on_notification(self, notification): + self.notifications.append(notification) + + def on_delete(self, status_id): + self.deletes.append(status_id) + + def handle_heartbeat(self): + self.heartbeats += 1 + + def handle_stream_(self, lines): + '''Test helper to avoid littering all tests with six.b().''' + return self.handle_stream(map(six.b, lines)) + +def test_heartbeat(): + listener = Listener() + listener.handle_stream_([':one', ':two']) + assert listener.heartbeats == 2 + + +def test_status(): + listener = Listener() + listener.handle_stream_([ + 'event: update', + 'data: {"foo": "bar"}', + '', + ]) + assert listener.updates == [{"foo": "bar"}] + + +def test_notification(): + listener = Listener() + listener.handle_stream_([ + 'event: notification', + 'data: {"foo": "bar"}', + '', + ]) + assert listener.notifications == [{"foo": "bar"}] + + +def test_delete(): + listener = Listener() + listener.handle_stream_([ + 'event: delete', + 'data: 123', + '', + ]) + assert listener.deletes == [123] + + +@pytest.mark.parametrize('events', itertools.permutations([ + ['event: update', 'data: {"foo": "bar"}', ''], + ['event: notification', 'data: {"foo": "bar"}', ''], + ['event: delete', 'data: 123', ''], + [':toot toot'], + [':beep beep'], +])) +def test_many(events): + listener = Listener() + stream = [ + line + for event in events + for line in event + ] + listener.handle_stream_(stream) + assert listener.updates == [{"foo": "bar"}] + assert listener.notifications == [{"foo": "bar"}] + assert listener.deletes == [123] + assert listener.heartbeats == 2 + + +def test_unknown_event(): + '''Be tolerant of new event types''' + listener = Listener() + listener.handle_stream_([ + 'event: blahblah', + 'data: {}', + '', + ]) + assert listener.updates == [] + assert listener.notifications == [] + assert listener.deletes == [] + assert listener.heartbeats == 0 + + +def test_missing_event_name(): + listener = Listener() + with pytest.raises(MalformedEventError): + listener.handle_stream_([ + 'data: {}', + '', + ]) + + assert listener.updates == [] + assert listener.notifications == [] + assert listener.deletes == [] + assert listener.heartbeats == 0 + + +def test_missing_data(): + listener = Listener() + with pytest.raises(MalformedEventError): + listener.handle_stream_([ + 'event: update', + '', + ]) + + assert listener.updates == [] + assert listener.notifications == [] + assert listener.deletes == [] + assert listener.heartbeats == 0 + + +def test_sse_order_doesnt_matter(): + listener = Listener() + listener.handle_stream_([ + 'data: {"foo": "bar"}', + 'event: update', + '', + ]) + assert listener.updates == [{"foo": "bar"}] + + +def test_extra_keys_ignored(): + ''' + https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Event_stream_format + defines 'id' and 'retry' keys which the Mastodon streaming API doesn't use, + and alleges that "All other field names are ignored". + ''' + listener = Listener() + listener.handle_stream_([ + 'event: update', + 'data: {"foo": "bar"}', + 'id: 123', + 'retry: 456', + 'ignoreme: blah blah blah', + '', + ]) + assert listener.updates == [{"foo": "bar"}] + + +def test_valid_utf8(): + '''Snowman Cat Face With Tears Of Joy''' + listener = Listener() + listener.handle_stream_([ + 'event: update', + 'data: {"foo": "\xE2\x98\x83\xF0\x9F\x98\xB9"}', + '', + ]) + assert listener.updates == [{"foo": u"\u2603\U0001F639"}] + + +def test_invalid_utf8(): + '''Cat Face With Tears O''' + listener = Listener() + with pytest.raises(MalformedEventError): + listener.handle_stream_([ + 'event: update', + 'data: {"foo": "\xF0\x9F\x98"}', + '', + ]) + + +def test_multiline_payload(): + ''' + https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Data-only_messages + says that newlines in the 'data' field can be encoded by sending the field + twice! This would be really pathological for Mastodon because the payload + is JSON, but technically literal newlines are permissible (outside strings) + so let's handle this case. + ''' + listener = Listener() + listener.handle_stream_([ + 'event: update', + 'data: {"foo":', + 'data: "bar"', + 'data: }', + '', + ]) + assert listener.updates == [{"foo": "bar"}] -- cgit v1.2.3