diff options
Diffstat (limited to 'mastodon/streaming.py')
-rw-r--r-- | mastodon/streaming.py | 46 |
1 files changed, 26 insertions, 20 deletions
diff --git a/mastodon/streaming.py b/mastodon/streaming.py index d55ad54..1c73f48 100644 --- a/mastodon/streaming.py +++ b/mastodon/streaming.py | |||
@@ -6,7 +6,8 @@ https://github.com/tootsuite/mastodon/blob/master/docs/Using-the-API/Streaming-A | |||
6 | import json | 6 | import json |
7 | import six | 7 | import six |
8 | from mastodon import Mastodon | 8 | from mastodon import Mastodon |
9 | from mastodon.Mastodon import MastodonMalformedEventError | 9 | from mastodon.Mastodon import MastodonMalformedEventError, MastodonNetworkError |
10 | from requests.exceptions import ChunkedEncodingError | ||
10 | 11 | ||
11 | class StreamListener(object): | 12 | class StreamListener(object): |
12 | """Callbacks for the streaming API. Create a subclass, override the on_xxx | 13 | """Callbacks for the streaming API. Create a subclass, override the on_xxx |
@@ -43,25 +44,31 @@ class StreamListener(object): | |||
43 | """ | 44 | """ |
44 | event = {} | 45 | event = {} |
45 | line_buffer = bytearray() | 46 | line_buffer = bytearray() |
46 | for chunk in response.iter_content(chunk_size = 1): | 47 | try: |
47 | if chunk: | 48 | for chunk in response.iter_content(chunk_size = 1): |
48 | if chunk == b'\n': | 49 | if chunk: |
49 | try: | 50 | if chunk == b'\n': |
50 | line = line_buffer.decode('utf-8') | 51 | try: |
51 | except UnicodeDecodeError as err: | 52 | line = line_buffer.decode('utf-8') |
52 | six.raise_from( | 53 | except UnicodeDecodeError as err: |
53 | MastodonMalformedEventError("Malformed UTF-8"), | 54 | six.raise_from( |
54 | err | 55 | MastodonMalformedEventError("Malformed UTF-8"), |
55 | ) | 56 | err |
56 | if line == '': | 57 | ) |
57 | self._dispatch(event) | 58 | if line == '': |
58 | event = {} | 59 | self._dispatch(event) |
60 | event = {} | ||
61 | else: | ||
62 | event = self._parse_line(line, event) | ||
63 | line_buffer = bytearray() | ||
59 | else: | 64 | else: |
60 | event = self._parse_line(line, event) | 65 | line_buffer.extend(chunk) |
61 | line_buffer = bytearray() | 66 | except ChunkedEncodingError as err: |
62 | else: | 67 | six.raise_from( |
63 | line_buffer.extend(chunk) | 68 | MastodonNetworkError("Server ceased communication."), |
64 | 69 | err | |
70 | ) | ||
71 | |||
65 | def _parse_line(self, line, event): | 72 | def _parse_line(self, line, event): |
66 | if line.startswith(':'): | 73 | if line.startswith(':'): |
67 | self.handle_heartbeat() | 74 | self.handle_heartbeat() |
@@ -102,7 +109,6 @@ class StreamListener(object): | |||
102 | err | 109 | err |
103 | ) | 110 | ) |
104 | else: | 111 | else: |
105 | # TODO: allow handlers to return/raise to stop streaming cleanly | ||
106 | handler(payload) | 112 | handler(payload) |
107 | 113 | ||
108 | class CallbackStreamListener(StreamListener): | 114 | class CallbackStreamListener(StreamListener): |