From 9f9a7826d7bdb30eb394c6fd9f4fdafb06e9bfae Mon Sep 17 00:00:00 2001 From: Lorenz Diener Date: Tue, 19 Dec 2017 13:49:00 +0100 Subject: Fix streaming API to be more stable (closes #117) --- mastodon/Mastodon.py | 4 ++-- mastodon/streaming.py | 58 +++++++++++++++++++++++++++++---------------------- 2 files changed, 35 insertions(+), 27 deletions(-) (limited to 'mastodon') diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index a2f7738..ca4849f 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py @@ -1684,7 +1684,7 @@ class Mastodon: self._thread = threading.current_thread() with closing(connection) as r: try: - listener.handle_stream(r.iter_lines(chunk_size = 1, decode_unicode = True)) + listener.handle_stream(r) except AttributeError as e: if not self.closed: raise e @@ -1699,7 +1699,7 @@ class Mastodon: else: # Blocking, never returns (can only leave via exception) with closing(connection) as r: - listener.handle_stream(r.iter_lines()) + listener.handle_stream(r) def __generate_params(self, params, exclude=[]): """ diff --git a/mastodon/streaming.py b/mastodon/streaming.py index 92a02dc..3df3298 100644 --- a/mastodon/streaming.py +++ b/mastodon/streaming.py @@ -34,38 +34,46 @@ class StreamListener(object): that the connection is still open.""" pass - def handle_stream(self, lines): + def handle_stream(self, response): """ Handles a stream of events from the Mastodon server. When each event is received, the corresponding .on_[name]() method is called. - lines: an iterable of lines of bytes sent by the Mastodon server, as - returned by requests.Response.iter_lines(). + response; a requests response object with the open stream for reading. """ - event = {} - for raw_line in lines: - try: - line = raw_line.decode('utf-8') - except UnicodeDecodeError as err: - six.raise_from( - MastodonMalformedEventError("Malformed UTF-8", line), - err - ) + self.event = {} + line_buffer = bytearray() + for chunk in response.iter_content(chunk_size = 1): + if chunk: + if chunk == b'\n': + self.handle_line(line_buffer) + line_buffer = bytearray() + else: + line_buffer.extend(chunk) + + def handle_line(self, raw_line): + try: + line = raw_line.decode('utf-8') + except UnicodeDecodeError as err: + six.raise_from( + MastodonMalformedEventError("Malformed UTF-8", line), + err + ) - if line.startswith(':'): - self.handle_heartbeat() - elif line == '': - # end of event - self._dispatch(event) - event = {} + if line.startswith(':'): + self.handle_heartbeat() + elif line == '': + # end of event + self._dispatch(self.event) + self.event = {} + else: + key, value = line.split(': ', 1) + # According to the MDN spec, repeating the 'data' key + # represents a newline(!) + if key in self.event: + self.event[key] += '\n' + value else: - key, value = line.split(': ', 1) - # According to the MDN spec, repeating the 'data' key - # represents a newline(!) - if key in event: - event[key] += '\n' + value - else: - event[key] = value + self.event[key] = value def _dispatch(self, event): try: -- cgit v1.2.3