diff options
author | Lorenz Diener <[email protected]> | 2017-12-19 13:49:00 +0100 |
---|---|---|
committer | Lorenz Diener <[email protected]> | 2017-12-19 13:49:00 +0100 |
commit | 9f9a7826d7bdb30eb394c6fd9f4fdafb06e9bfae (patch) | |
tree | cf7fe9a4e2467a4a6c7f9d9fce7a34798ad0512e | |
parent | e5c50ea80d86449b5f95a32cf3a01d7bf8e9d2f4 (diff) | |
download | mastodon.py-9f9a7826d7bdb30eb394c6fd9f4fdafb06e9bfae.tar.gz |
Fix streaming API to be more stable (closes #117)
-rw-r--r-- | mastodon/Mastodon.py | 4 | ||||
-rw-r--r-- | mastodon/streaming.py | 58 |
2 files changed, 35 insertions, 27 deletions
diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index a2f7738..ca4849f 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py | |||
@@ -1684,7 +1684,7 @@ class Mastodon: | |||
1684 | self._thread = threading.current_thread() | 1684 | self._thread = threading.current_thread() |
1685 | with closing(connection) as r: | 1685 | with closing(connection) as r: |
1686 | try: | 1686 | try: |
1687 | listener.handle_stream(r.iter_lines(chunk_size = 1, decode_unicode = True)) | 1687 | listener.handle_stream(r) |
1688 | except AttributeError as e: | 1688 | except AttributeError as e: |
1689 | if not self.closed: | 1689 | if not self.closed: |
1690 | raise e | 1690 | raise e |
@@ -1699,7 +1699,7 @@ class Mastodon: | |||
1699 | else: | 1699 | else: |
1700 | # Blocking, never returns (can only leave via exception) | 1700 | # Blocking, never returns (can only leave via exception) |
1701 | with closing(connection) as r: | 1701 | with closing(connection) as r: |
1702 | listener.handle_stream(r.iter_lines()) | 1702 | listener.handle_stream(r) |
1703 | 1703 | ||
1704 | def __generate_params(self, params, exclude=[]): | 1704 | def __generate_params(self, params, exclude=[]): |
1705 | """ | 1705 | """ |
diff --git a/mastodon/streaming.py b/mastodon/streaming.py index 92a02dc..3df3298 100644 --- a/mastodon/streaming.py +++ b/mastodon/streaming.py | |||
@@ -34,38 +34,46 @@ class StreamListener(object): | |||
34 | that the connection is still open.""" | 34 | that the connection is still open.""" |
35 | pass | 35 | pass |
36 | 36 | ||
37 | def handle_stream(self, lines): | 37 | def handle_stream(self, response): |
38 | """ | 38 | """ |
39 | Handles a stream of events from the Mastodon server. When each event | 39 | Handles a stream of events from the Mastodon server. When each event |
40 | is received, the corresponding .on_[name]() method is called. | 40 | is received, the corresponding .on_[name]() method is called. |
41 | 41 | ||
42 | lines: an iterable of lines of bytes sent by the Mastodon server, as | 42 | response; a requests response object with the open stream for reading. |
43 | returned by requests.Response.iter_lines(). | ||
44 | """ | 43 | """ |
45 | event = {} | 44 | self.event = {} |
46 | for raw_line in lines: | 45 | line_buffer = bytearray() |
47 | try: | 46 | for chunk in response.iter_content(chunk_size = 1): |
48 | line = raw_line.decode('utf-8') | 47 | if chunk: |
49 | except UnicodeDecodeError as err: | 48 | if chunk == b'\n': |
50 | six.raise_from( | 49 | self.handle_line(line_buffer) |
51 | MastodonMalformedEventError("Malformed UTF-8", line), | 50 | line_buffer = bytearray() |
52 | err | 51 | else: |
53 | ) | 52 | line_buffer.extend(chunk) |
53 | |||
54 | def handle_line(self, raw_line): | ||
55 | try: | ||
56 | line = raw_line.decode('utf-8') | ||
57 | except UnicodeDecodeError as err: | ||
58 | six.raise_from( | ||
59 | MastodonMalformedEventError("Malformed UTF-8", line), | ||
60 | err | ||
61 | ) | ||
54 | 62 | ||
55 | if line.startswith(':'): | 63 | if line.startswith(':'): |
56 | self.handle_heartbeat() | 64 | self.handle_heartbeat() |
57 | elif line == '': | 65 | elif line == '': |
58 | # end of event | 66 | # end of event |
59 | self._dispatch(event) | 67 | self._dispatch(self.event) |
60 | event = {} | 68 | self.event = {} |
69 | else: | ||
70 | key, value = line.split(': ', 1) | ||
71 | # According to the MDN spec, repeating the 'data' key | ||
72 | # represents a newline(!) | ||
73 | if key in self.event: | ||
74 | self.event[key] += '\n' + value | ||
61 | else: | 75 | else: |
62 | key, value = line.split(': ', 1) | 76 | self.event[key] = value |
63 | # According to the MDN spec, repeating the 'data' key | ||
64 | # represents a newline(!) | ||
65 | if key in event: | ||
66 | event[key] += '\n' + value | ||
67 | else: | ||
68 | event[key] = value | ||
69 | 77 | ||
70 | def _dispatch(self, event): | 78 | def _dispatch(self, event): |
71 | try: | 79 | try: |