diff options
Diffstat (limited to 'mastodon/streaming.py')
-rw-r--r-- | mastodon/streaming.py | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/mastodon/streaming.py b/mastodon/streaming.py new file mode 100644 index 0000000..3212848 --- /dev/null +++ b/mastodon/streaming.py | |||
@@ -0,0 +1,107 @@ | |||
1 | ''' | ||
2 | Handlers for the Streaming API: | ||
3 | https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-API.md | ||
4 | ''' | ||
5 | |||
6 | import json | ||
7 | import logging | ||
8 | import six | ||
9 | |||
10 | |||
11 | log = logging.getLogger(__name__) | ||
12 | |||
13 | |||
14 | class MalformedEventError(Exception): | ||
15 | '''Raised when the server-sent event stream is malformed.''' | ||
16 | pass | ||
17 | |||
18 | |||
19 | class StreamListener(object): | ||
20 | '''Callbacks for the streaming API. Create a subclass, override the on_xxx | ||
21 | methods for the kinds of events you're interested in, then pass an instance | ||
22 | of your subclass to Mastodon.user_stream(), Mastodon.public_stream(), or | ||
23 | Mastodon.hashtag_stream().''' | ||
24 | |||
25 | def on_update(self, status): | ||
26 | '''A new status has appeared! 'status' is the parsed JSON dictionary | ||
27 | describing the status.''' | ||
28 | pass | ||
29 | |||
30 | def on_notification(self, notification): | ||
31 | '''A new notification. 'notification' is the parsed JSON dictionary | ||
32 | describing the notification.''' | ||
33 | pass | ||
34 | |||
35 | def on_delete(self, status_id): | ||
36 | '''A status has been deleted. status_id is the status' integer ID.''' | ||
37 | pass | ||
38 | |||
39 | def handle_heartbeat(self): | ||
40 | '''The server has sent us a keep-alive message. This callback may be | ||
41 | useful to carry out periodic housekeeping tasks, or just to confirm | ||
42 | that the connection is still open.''' | ||
43 | |||
44 | def handle_stream(self, lines): | ||
45 | ''' | ||
46 | Handles a stream of events from the Mastodon server. When each event | ||
47 | is received, the corresponding .on_[name]() method is called. | ||
48 | |||
49 | lines: an iterable of lines of bytes sent by the Mastodon server, as | ||
50 | returned by requests.Response.iter_lines(). | ||
51 | ''' | ||
52 | event = {} | ||
53 | for raw_line in lines: | ||
54 | try: | ||
55 | line = raw_line.decode('utf-8') | ||
56 | except UnicodeDecodeError as err: | ||
57 | six.raise_from( | ||
58 | MalformedEventError("Malformed UTF-8", line), | ||
59 | err | ||
60 | ) | ||
61 | |||
62 | if line.startswith(':'): | ||
63 | self.handle_heartbeat() | ||
64 | elif line == '': | ||
65 | # end of event | ||
66 | self._despatch(event) | ||
67 | event = {} | ||
68 | else: | ||
69 | key, value = line.split(': ', 1) | ||
70 | # According to the MDN spec, repeating the 'data' key | ||
71 | # represents a newline(!) | ||
72 | if key in event: | ||
73 | event[key] += '\n' + value | ||
74 | else: | ||
75 | event[key] = value | ||
76 | |||
77 | # end of stream | ||
78 | if event: | ||
79 | log.warn("outstanding partial event at end of stream: %s", event) | ||
80 | |||
81 | def _despatch(self, event): | ||
82 | try: | ||
83 | name = event['event'] | ||
84 | data = event['data'] | ||
85 | payload = json.loads(data) | ||
86 | except KeyError as err: | ||
87 | six.raise_from( | ||
88 | MalformedEventError('Missing field', err.args[0], event), | ||
89 | err | ||
90 | ) | ||
91 | except ValueError as err: | ||
92 | # py2: plain ValueError | ||
93 | # py3: json.JSONDecodeError, a subclass of ValueError | ||
94 | six.raise_from( | ||
95 | MalformedEventError('Bad JSON', data), | ||
96 | err | ||
97 | ) | ||
98 | |||
99 | handler_name = 'on_' + name | ||
100 | try: | ||
101 | handler = getattr(self, handler_name) | ||
102 | except AttributeError: | ||
103 | log.warn("Unhandled event '%s'", name) | ||
104 | else: | ||
105 | # TODO: allow handlers to return/raise to stop streaming cleanly | ||
106 | handler(payload) | ||
107 | |||