From ab60931620066f6704be3010903f779b3cb9c71a Mon Sep 17 00:00:00 2001 From: Will Thompson Date: Sun, 9 Apr 2017 10:21:56 +0100 Subject: Initial implementation of streaming API This is missing any error handling and rate-limiting around the stream itself, but once the stream is established, the full range of events are supported. Fixes issue #14. --- mastodon/streaming.py | 107 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 mastodon/streaming.py (limited to 'mastodon/streaming.py') diff --git a/mastodon/streaming.py b/mastodon/streaming.py new file mode 100644 index 0000000..3212848 --- /dev/null +++ b/mastodon/streaming.py @@ -0,0 +1,107 @@ +''' +Handlers for the Streaming API: +https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-API.md +''' + +import json +import logging +import six + + +log = logging.getLogger(__name__) + + +class MalformedEventError(Exception): + '''Raised when the server-sent event stream is malformed.''' + pass + + +class StreamListener(object): + '''Callbacks for the streaming API. Create a subclass, override the on_xxx + methods for the kinds of events you're interested in, then pass an instance + of your subclass to Mastodon.user_stream(), Mastodon.public_stream(), or + Mastodon.hashtag_stream().''' + + def on_update(self, status): + '''A new status has appeared! 'status' is the parsed JSON dictionary + describing the status.''' + pass + + def on_notification(self, notification): + '''A new notification. 'notification' is the parsed JSON dictionary + describing the notification.''' + pass + + def on_delete(self, status_id): + '''A status has been deleted. status_id is the status' integer ID.''' + pass + + def handle_heartbeat(self): + '''The server has sent us a keep-alive message. This callback may be + useful to carry out periodic housekeeping tasks, or just to confirm + that the connection is still open.''' + + def handle_stream(self, lines): + ''' + Handles a stream of events from the Mastodon server. When each event + is received, the corresponding .on_[name]() method is called. + + lines: an iterable of lines of bytes sent by the Mastodon server, as + returned by requests.Response.iter_lines(). + ''' + event = {} + for raw_line in lines: + try: + line = raw_line.decode('utf-8') + except UnicodeDecodeError as err: + six.raise_from( + MalformedEventError("Malformed UTF-8", line), + err + ) + + if line.startswith(':'): + self.handle_heartbeat() + elif line == '': + # end of event + self._despatch(event) + event = {} + else: + key, value = line.split(': ', 1) + # According to the MDN spec, repeating the 'data' key + # represents a newline(!) + if key in event: + event[key] += '\n' + value + else: + event[key] = value + + # end of stream + if event: + log.warn("outstanding partial event at end of stream: %s", event) + + def _despatch(self, event): + try: + name = event['event'] + data = event['data'] + payload = json.loads(data) + except KeyError as err: + six.raise_from( + MalformedEventError('Missing field', err.args[0], event), + err + ) + except ValueError as err: + # py2: plain ValueError + # py3: json.JSONDecodeError, a subclass of ValueError + six.raise_from( + MalformedEventError('Bad JSON', data), + err + ) + + handler_name = 'on_' + name + try: + handler = getattr(self, handler_name) + except AttributeError: + log.warn("Unhandled event '%s'", name) + else: + # TODO: allow handlers to return/raise to stop streaming cleanly + handler(payload) + -- cgit v1.2.3