aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Thompson <[email protected]>2017-04-09 10:21:56 +0100
committerWill Thompson <[email protected]>2017-04-10 08:18:08 +0100
commitab60931620066f6704be3010903f779b3cb9c71a (patch)
tree86211f5f6d5ca7ffb495d48ce98c6d71c30e100b /mastodon/streaming.py
parent280c60120beb13d00c807c418c765b93da248b19 (diff)
downloadmastodon.py-ab60931620066f6704be3010903f779b3cb9c71a.tar.gz
Initial implementation of streaming API
This is missing any error handling and rate-limiting around the stream itself, but once the stream is established, the full range of events are supported. Fixes issue #14.
Diffstat (limited to 'mastodon/streaming.py')
-rw-r--r--mastodon/streaming.py107
1 files changed, 107 insertions, 0 deletions
diff --git a/mastodon/streaming.py b/mastodon/streaming.py
new file mode 100644
index 0000000..3212848
--- /dev/null
+++ b/mastodon/streaming.py
@@ -0,0 +1,107 @@
1'''
2Handlers for the Streaming API:
3https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-API.md
4'''
5
6import json
7import logging
8import six
9
10
11log = logging.getLogger(__name__)
12
13
14class MalformedEventError(Exception):
15 '''Raised when the server-sent event stream is malformed.'''
16 pass
17
18
19class StreamListener(object):
20 '''Callbacks for the streaming API. Create a subclass, override the on_xxx
21 methods for the kinds of events you're interested in, then pass an instance
22 of your subclass to Mastodon.user_stream(), Mastodon.public_stream(), or
23 Mastodon.hashtag_stream().'''
24
25 def on_update(self, status):
26 '''A new status has appeared! 'status' is the parsed JSON dictionary
27 describing the status.'''
28 pass
29
30 def on_notification(self, notification):
31 '''A new notification. 'notification' is the parsed JSON dictionary
32 describing the notification.'''
33 pass
34
35 def on_delete(self, status_id):
36 '''A status has been deleted. status_id is the status' integer ID.'''
37 pass
38
39 def handle_heartbeat(self):
40 '''The server has sent us a keep-alive message. This callback may be
41 useful to carry out periodic housekeeping tasks, or just to confirm
42 that the connection is still open.'''
43
44 def handle_stream(self, lines):
45 '''
46 Handles a stream of events from the Mastodon server. When each event
47 is received, the corresponding .on_[name]() method is called.
48
49 lines: an iterable of lines of bytes sent by the Mastodon server, as
50 returned by requests.Response.iter_lines().
51 '''
52 event = {}
53 for raw_line in lines:
54 try:
55 line = raw_line.decode('utf-8')
56 except UnicodeDecodeError as err:
57 six.raise_from(
58 MalformedEventError("Malformed UTF-8", line),
59 err
60 )
61
62 if line.startswith(':'):
63 self.handle_heartbeat()
64 elif line == '':
65 # end of event
66 self._despatch(event)
67 event = {}
68 else:
69 key, value = line.split(': ', 1)
70 # According to the MDN spec, repeating the 'data' key
71 # represents a newline(!)
72 if key in event:
73 event[key] += '\n' + value
74 else:
75 event[key] = value
76
77 # end of stream
78 if event:
79 log.warn("outstanding partial event at end of stream: %s", event)
80
81 def _despatch(self, event):
82 try:
83 name = event['event']
84 data = event['data']
85 payload = json.loads(data)
86 except KeyError as err:
87 six.raise_from(
88 MalformedEventError('Missing field', err.args[0], event),
89 err
90 )
91 except ValueError as err:
92 # py2: plain ValueError
93 # py3: json.JSONDecodeError, a subclass of ValueError
94 six.raise_from(
95 MalformedEventError('Bad JSON', data),
96 err
97 )
98
99 handler_name = 'on_' + name
100 try:
101 handler = getattr(self, handler_name)
102 except AttributeError:
103 log.warn("Unhandled event '%s'", name)
104 else:
105 # TODO: allow handlers to return/raise to stop streaming cleanly
106 handler(payload)
107
Powered by cgit v1.2.3 (git 2.41.0)