aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Thompson <[email protected]>2017-04-09 10:21:56 +0100
committerWill Thompson <[email protected]>2017-04-10 08:18:08 +0100
commitab60931620066f6704be3010903f779b3cb9c71a (patch)
tree86211f5f6d5ca7ffb495d48ce98c6d71c30e100b /mastodon
parent280c60120beb13d00c807c418c765b93da248b19 (diff)
downloadmastodon.py-ab60931620066f6704be3010903f779b3cb9c71a.tar.gz
Initial implementation of streaming API
This is missing any error handling and rate-limiting around the stream itself, but once the stream is established, the full range of events are supported. Fixes issue #14.
Diffstat (limited to 'mastodon')
-rw-r--r--mastodon/Mastodon.py48
-rw-r--r--mastodon/__init__.py4
-rw-r--r--mastodon/streaming.py107
3 files changed, 158 insertions, 1 deletions
diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py
index 9967bdb..493fb40 100644
--- a/mastodon/Mastodon.py
+++ b/mastodon/Mastodon.py
@@ -12,6 +12,9 @@ import datetime
12import dateutil 12import dateutil
13import dateutil.parser 13import dateutil.parser
14 14
15from contextlib import closing
16
17
15class Mastodon: 18class Mastodon:
16 """ 19 """
17 Super basic but thorough and easy to use mastodon.social 20 Super basic but thorough and easy to use mastodon.social
@@ -578,6 +581,37 @@ class Mastodon:
578 media_file_description = (file_name, media_file, mime_type) 581 media_file_description = (file_name, media_file, mime_type)
579 return self.__api_request('POST', '/api/v1/media', files = {'file': media_file_description}) 582 return self.__api_request('POST', '/api/v1/media', files = {'file': media_file_description})
580 583
584 def user_stream(self, listener):
585 """
586 Streams events that are relevant to the authorized user, i.e. home
587 timeline and notifications. 'listener' should be a subclass of
588 StreamListener.
589
590 This method blocks forever, calling callbacks on 'listener' for
591 incoming events.
592 """
593 return self.__stream('/api/v1/streaming/user', listener)
594
595 def public_stream(self, listener):
596 """
597 Streams public events. 'listener' should be a subclass of
598 StreamListener.
599
600 This method blocks forever, calling callbacks on 'listener' for
601 incoming events.
602 """
603 return self.__stream('/api/v1/streaming/public', listener)
604
605 def hashtag_stream(self, tag, listener):
606 """
607 Returns all public statuses for the hashtag 'tag'. 'listener' should be
608 a subclass of StreamListener.
609
610 This method blocks forever, calling callbacks on 'listener' for
611 incoming events.
612 """
613 return self.__stream('/api/v1/streaming/hashtag', listener, params={'tag': tag})
614
581 ### 615 ###
582 # Internal helpers, dragons probably 616 # Internal helpers, dragons probably
583 ### 617 ###
@@ -710,6 +744,20 @@ class Mastodon:
710 744
711 return response 745 return response
712 746
747 def __stream(self, endpoint, listener, params = {}):
748 """
749 Internal streaming API helper.
750 """
751
752 headers = {}
753 if self.access_token != None:
754 headers = {'Authorization': 'Bearer ' + self.access_token}
755
756 url = self.api_base_url + endpoint
757 with closing(requests.get(url, headers = headers, data = params, stream = True)) as r:
758 listener.handle_stream(r.iter_lines())
759
760
713 def __generate_params(self, params, exclude = []): 761 def __generate_params(self, params, exclude = []):
714 """ 762 """
715 Internal named-parameters-to-dict helper. 763 Internal named-parameters-to-dict helper.
diff --git a/mastodon/__init__.py b/mastodon/__init__.py
index 17f63e6..9c8e39b 100644
--- a/mastodon/__init__.py
+++ b/mastodon/__init__.py
@@ -1,2 +1,4 @@
1from mastodon.Mastodon import Mastodon 1from mastodon.Mastodon import Mastodon
2__all__ = ['Mastodon'] 2from mastodon.streaming import StreamListener, MalformedEventError
3
4__all__ = ['Mastodon', 'StreamListener', 'MalformedEventError']
diff --git a/mastodon/streaming.py b/mastodon/streaming.py
new file mode 100644
index 0000000..3212848
--- /dev/null
+++ b/mastodon/streaming.py
@@ -0,0 +1,107 @@
1'''
2Handlers for the Streaming API:
3https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-API.md
4'''
5
6import json
7import logging
8import six
9
10
11log = logging.getLogger(__name__)
12
13
14class MalformedEventError(Exception):
15 '''Raised when the server-sent event stream is malformed.'''
16 pass
17
18
19class StreamListener(object):
20 '''Callbacks for the streaming API. Create a subclass, override the on_xxx
21 methods for the kinds of events you're interested in, then pass an instance
22 of your subclass to Mastodon.user_stream(), Mastodon.public_stream(), or
23 Mastodon.hashtag_stream().'''
24
25 def on_update(self, status):
26 '''A new status has appeared! 'status' is the parsed JSON dictionary
27 describing the status.'''
28 pass
29
30 def on_notification(self, notification):
31 '''A new notification. 'notification' is the parsed JSON dictionary
32 describing the notification.'''
33 pass
34
35 def on_delete(self, status_id):
36 '''A status has been deleted. status_id is the status' integer ID.'''
37 pass
38
39 def handle_heartbeat(self):
40 '''The server has sent us a keep-alive message. This callback may be
41 useful to carry out periodic housekeeping tasks, or just to confirm
42 that the connection is still open.'''
43
44 def handle_stream(self, lines):
45 '''
46 Handles a stream of events from the Mastodon server. When each event
47 is received, the corresponding .on_[name]() method is called.
48
49 lines: an iterable of lines of bytes sent by the Mastodon server, as
50 returned by requests.Response.iter_lines().
51 '''
52 event = {}
53 for raw_line in lines:
54 try:
55 line = raw_line.decode('utf-8')
56 except UnicodeDecodeError as err:
57 six.raise_from(
58 MalformedEventError("Malformed UTF-8", line),
59 err
60 )
61
62 if line.startswith(':'):
63 self.handle_heartbeat()
64 elif line == '':
65 # end of event
66 self._despatch(event)
67 event = {}
68 else:
69 key, value = line.split(': ', 1)
70 # According to the MDN spec, repeating the 'data' key
71 # represents a newline(!)
72 if key in event:
73 event[key] += '\n' + value
74 else:
75 event[key] = value
76
77 # end of stream
78 if event:
79 log.warn("outstanding partial event at end of stream: %s", event)
80
81 def _despatch(self, event):
82 try:
83 name = event['event']
84 data = event['data']
85 payload = json.loads(data)
86 except KeyError as err:
87 six.raise_from(
88 MalformedEventError('Missing field', err.args[0], event),
89 err
90 )
91 except ValueError as err:
92 # py2: plain ValueError
93 # py3: json.JSONDecodeError, a subclass of ValueError
94 six.raise_from(
95 MalformedEventError('Bad JSON', data),
96 err
97 )
98
99 handler_name = 'on_' + name
100 try:
101 handler = getattr(self, handler_name)
102 except AttributeError:
103 log.warn("Unhandled event '%s'", name)
104 else:
105 # TODO: allow handlers to return/raise to stop streaming cleanly
106 handler(payload)
107
Powered by cgit v1.2.3 (git 2.41.0)