From ab60931620066f6704be3010903f779b3cb9c71a Mon Sep 17 00:00:00 2001 From: Will Thompson Date: Sun, 9 Apr 2017 10:21:56 +0100 Subject: Initial implementation of streaming API This is missing any error handling and rate-limiting around the stream itself, but once the stream is established, the full range of events are supported. Fixes issue #14. --- mastodon/Mastodon.py | 48 ++++++++++++++++++++++ mastodon/__init__.py | 4 +- mastodon/streaming.py | 107 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 mastodon/streaming.py (limited to 'mastodon') diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 9967bdb..493fb40 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -12,6 +12,9 @@ import datetime import dateutil import dateutil.parser +from contextlib import closing + + class Mastodon: """ Super basic but thorough and easy to use mastodon.social @@ -578,6 +581,37 @@ class Mastodon: media_file_description = (file_name, media_file, mime_type) return self.__api_request('POST', '/api/v1/media', files = {'file': media_file_description}) + def user_stream(self, listener): + """ + Streams events that are relevant to the authorized user, i.e. home + timeline and notifications. 'listener' should be a subclass of + StreamListener. + + This method blocks forever, calling callbacks on 'listener' for + incoming events. + """ + return self.__stream('/api/v1/streaming/user', listener) + + def public_stream(self, listener): + """ + Streams public events. 'listener' should be a subclass of + StreamListener. + + This method blocks forever, calling callbacks on 'listener' for + incoming events. + """ + return self.__stream('/api/v1/streaming/public', listener) + + def hashtag_stream(self, tag, listener): + """ + Returns all public statuses for the hashtag 'tag'. 'listener' should be + a subclass of StreamListener. + + This method blocks forever, calling callbacks on 'listener' for + incoming events. + """ + return self.__stream('/api/v1/streaming/hashtag', listener, params={'tag': tag}) + ### # Internal helpers, dragons probably ### @@ -710,6 +744,20 @@ class Mastodon: return response + def __stream(self, endpoint, listener, params = {}): + """ + Internal streaming API helper. + """ + + headers = {} + if self.access_token != None: + headers = {'Authorization': 'Bearer ' + self.access_token} + + url = self.api_base_url + endpoint + with closing(requests.get(url, headers = headers, data = params, stream = True)) as r: + listener.handle_stream(r.iter_lines()) + + def __generate_params(self, params, exclude = []): """ Internal named-parameters-to-dict helper. diff --git a/mastodon/__init__.py b/mastodon/__init__.py index 17f63e6..9c8e39b 100644 --- a/mastodon/__init__.py +++ b/mastodon/__init__.py @@ -1,2 +1,4 @@ from mastodon.Mastodon import Mastodon -__all__ = ['Mastodon'] +from mastodon.streaming import StreamListener, MalformedEventError + +__all__ = ['Mastodon', 'StreamListener', 'MalformedEventError'] diff --git a/mastodon/streaming.py b/mastodon/streaming.py new file mode 100644 index 0000000..3212848 --- /dev/null +++ b/mastodon/streaming.py @@ -0,0 +1,107 @@ +''' +Handlers for the Streaming API: +https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-API.md +''' + +import json +import logging +import six + + +log = logging.getLogger(__name__) + + +class MalformedEventError(Exception): + '''Raised when the server-sent event stream is malformed.''' + pass + + +class StreamListener(object): + '''Callbacks for the streaming API. Create a subclass, override the on_xxx + methods for the kinds of events you're interested in, then pass an instance + of your subclass to Mastodon.user_stream(), Mastodon.public_stream(), or + Mastodon.hashtag_stream().''' + + def on_update(self, status): + '''A new status has appeared! 'status' is the parsed JSON dictionary + describing the status.''' + pass + + def on_notification(self, notification): + '''A new notification. 'notification' is the parsed JSON dictionary + describing the notification.''' + pass + + def on_delete(self, status_id): + '''A status has been deleted. status_id is the status' integer ID.''' + pass + + def handle_heartbeat(self): + '''The server has sent us a keep-alive message. This callback may be + useful to carry out periodic housekeeping tasks, or just to confirm + that the connection is still open.''' + + def handle_stream(self, lines): + ''' + Handles a stream of events from the Mastodon server. When each event + is received, the corresponding .on_[name]() method is called. + + lines: an iterable of lines of bytes sent by the Mastodon server, as + returned by requests.Response.iter_lines(). + ''' + event = {} + for raw_line in lines: + try: + line = raw_line.decode('utf-8') + except UnicodeDecodeError as err: + six.raise_from( + MalformedEventError("Malformed UTF-8", line), + err + ) + + if line.startswith(':'): + self.handle_heartbeat() + elif line == '': + # end of event + self._despatch(event) + event = {} + else: + key, value = line.split(': ', 1) + # According to the MDN spec, repeating the 'data' key + # represents a newline(!) + if key in event: + event[key] += '\n' + value + else: + event[key] = value + + # end of stream + if event: + log.warn("outstanding partial event at end of stream: %s", event) + + def _despatch(self, event): + try: + name = event['event'] + data = event['data'] + payload = json.loads(data) + except KeyError as err: + six.raise_from( + MalformedEventError('Missing field', err.args[0], event), + err + ) + except ValueError as err: + # py2: plain ValueError + # py3: json.JSONDecodeError, a subclass of ValueError + six.raise_from( + MalformedEventError('Bad JSON', data), + err + ) + + handler_name = 'on_' + name + try: + handler = getattr(self, handler_name) + except AttributeError: + log.warn("Unhandled event '%s'", name) + else: + # TODO: allow handlers to return/raise to stop streaming cleanly + handler(payload) + -- cgit v1.2.3