aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mastodon/Mastodon.py4
-rw-r--r--mastodon/streaming.py58
2 files changed, 35 insertions, 27 deletions
diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py
index a2f7738..ca4849f 100644
--- a/mastodon/Mastodon.py
+++ b/mastodon/Mastodon.py
@@ -1684,7 +1684,7 @@ class Mastodon:
1684 self._thread = threading.current_thread() 1684 self._thread = threading.current_thread()
1685 with closing(connection) as r: 1685 with closing(connection) as r:
1686 try: 1686 try:
1687 listener.handle_stream(r.iter_lines(chunk_size = 1, decode_unicode = True)) 1687 listener.handle_stream(r)
1688 except AttributeError as e: 1688 except AttributeError as e:
1689 if not self.closed: 1689 if not self.closed:
1690 raise e 1690 raise e
@@ -1699,7 +1699,7 @@ class Mastodon:
1699 else: 1699 else:
1700 # Blocking, never returns (can only leave via exception) 1700 # Blocking, never returns (can only leave via exception)
1701 with closing(connection) as r: 1701 with closing(connection) as r:
1702 listener.handle_stream(r.iter_lines()) 1702 listener.handle_stream(r)
1703 1703
1704 def __generate_params(self, params, exclude=[]): 1704 def __generate_params(self, params, exclude=[]):
1705 """ 1705 """
diff --git a/mastodon/streaming.py b/mastodon/streaming.py
index 92a02dc..3df3298 100644
--- a/mastodon/streaming.py
+++ b/mastodon/streaming.py
@@ -34,38 +34,46 @@ class StreamListener(object):
34 that the connection is still open.""" 34 that the connection is still open."""
35 pass 35 pass
36 36
37 def handle_stream(self, lines): 37 def handle_stream(self, response):
38 """ 38 """
39 Handles a stream of events from the Mastodon server. When each event 39 Handles a stream of events from the Mastodon server. When each event
40 is received, the corresponding .on_[name]() method is called. 40 is received, the corresponding .on_[name]() method is called.
41 41
42 lines: an iterable of lines of bytes sent by the Mastodon server, as 42 response; a requests response object with the open stream for reading.
43 returned by requests.Response.iter_lines().
44 """ 43 """
45 event = {} 44 self.event = {}
46 for raw_line in lines: 45 line_buffer = bytearray()
47 try: 46 for chunk in response.iter_content(chunk_size = 1):
48 line = raw_line.decode('utf-8') 47 if chunk:
49 except UnicodeDecodeError as err: 48 if chunk == b'\n':
50 six.raise_from( 49 self.handle_line(line_buffer)
51 MastodonMalformedEventError("Malformed UTF-8", line), 50 line_buffer = bytearray()
52 err 51 else:
53 ) 52 line_buffer.extend(chunk)
53
54 def handle_line(self, raw_line):
55 try:
56 line = raw_line.decode('utf-8')
57 except UnicodeDecodeError as err:
58 six.raise_from(
59 MastodonMalformedEventError("Malformed UTF-8", line),
60 err
61 )
54 62
55 if line.startswith(':'): 63 if line.startswith(':'):
56 self.handle_heartbeat() 64 self.handle_heartbeat()
57 elif line == '': 65 elif line == '':
58 # end of event 66 # end of event
59 self._dispatch(event) 67 self._dispatch(self.event)
60 event = {} 68 self.event = {}
69 else:
70 key, value = line.split(': ', 1)
71 # According to the MDN spec, repeating the 'data' key
72 # represents a newline(!)
73 if key in self.event:
74 self.event[key] += '\n' + value
61 else: 75 else:
62 key, value = line.split(': ', 1) 76 self.event[key] = value
63 # According to the MDN spec, repeating the 'data' key
64 # represents a newline(!)
65 if key in event:
66 event[key] += '\n' + value
67 else:
68 event[key] = value
69 77
70 def _dispatch(self, event): 78 def _dispatch(self, event):
71 try: 79 try:
Powered by cgit v1.2.3 (git 2.41.0)