aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/index.rst3
-rw-r--r--mastodon/streaming.py60
2 files changed, 43 insertions, 20 deletions
diff --git a/docs/index.rst b/docs/index.rst
index 56e0756..fc8c003 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -815,7 +815,8 @@ will return a handle corresponding to the open connection. If, in addition, `asy
815the thread will attempt to reconnect to the streaming API if any errors are encountered, waiting 815the thread will attempt to reconnect to the streaming API if any errors are encountered, waiting
816`async_reconnect_wait_sec` seconds between reconnection attempts. Note that no effort is made 816`async_reconnect_wait_sec` seconds between reconnection attempts. Note that no effort is made
817to "catch up" - events created while the connection is broken will not be received. If you need to make 817to "catch up" - events created while the connection is broken will not be received. If you need to make
818sure to get absolutely all notifications / deletes / toots, you will have to do that manually. 818sure to get absolutely all notifications / deletes / toots, you will have to do that manually, e.g.
819using the `on_abort` handler to fill in events since the last received one and then reconnecting.
819 820
820The connection may be closed at any time by calling the handles close() method. The 821The connection may be closed at any time by calling the handles close() method. The
821current status of the handler thread can be checked with the handles is_alive() function, 822current status of the handler thread can be checked with the handles is_alive() function,
diff --git a/mastodon/streaming.py b/mastodon/streaming.py
index 8c3ec19..65ec30a 100644
--- a/mastodon/streaming.py
+++ b/mastodon/streaming.py
@@ -25,8 +25,15 @@ class StreamListener(object):
25 describing the notification.""" 25 describing the notification."""
26 pass 26 pass
27 27
28 def on_abort(self): 28 def on_abort(self, err):
29 """There was a connection error or read timeout.""" 29 """There was a connection error, read timeout or other error fatal to
30 the streaming connection. The exception object about to be raised
31 is passed to this function for reference.
32
33 Note that the exception will be raised properly once you return from this
34 function, so if you are using this handler to reconnect, either never
35 return or start a thread and then catch and ignore the exception.
36 """
30 pass 37 pass
31 38
32 def on_delete(self, status_id): 39 def on_delete(self, status_id):
@@ -55,8 +62,10 @@ class StreamListener(object):
55 try: 62 try:
56 line = line_buffer.decode('utf-8') 63 line = line_buffer.decode('utf-8')
57 except UnicodeDecodeError as err: 64 except UnicodeDecodeError as err:
65 exception = MastodonMalformedEventError("Malformed UTF-8")
66 self.on_abort(exception)
58 six.raise_from( 67 six.raise_from(
59 MastodonMalformedEventError("Malformed UTF-8"), 68 exception,
60 err 69 err
61 ) 70 )
62 if line == '': 71 if line == '':
@@ -68,15 +77,17 @@ class StreamListener(object):
68 else: 77 else:
69 line_buffer.extend(chunk) 78 line_buffer.extend(chunk)
70 except ChunkedEncodingError as err: 79 except ChunkedEncodingError as err:
71 self.on_abort() 80 exception = MastodonNetworkError("Server ceased communication.")
81 self.on_abort(exception)
72 six.raise_from( 82 six.raise_from(
73 MastodonNetworkError("Server ceased communication."), 83 exception,
74 err 84 err
75 ) 85 )
76 except MastodonReadTimeout as err: 86 except MastodonReadTimeout as err:
77 self.on_abort() 87 exception = MastodonReadTimeout("Timed out while reading from server."),
88 self.on_abort(exception)
78 six.raise_from( 89 six.raise_from(
79 MastodonReadTimeout("Timed out while reading from server."), 90 exception,
80 err 91 err
81 ) 92 )
82 93
@@ -84,7 +95,12 @@ class StreamListener(object):
84 if line.startswith(':'): 95 if line.startswith(':'):
85 self.handle_heartbeat() 96 self.handle_heartbeat()
86 else: 97 else:
87 key, value = line.split(': ', 1) 98 try:
99 key, value = line.split(': ', 1)
100 except:
101 exception = MastodonMalformedEventError("Malformed event.")
102 self.on_abort(exception)
103 raise exception
88 # According to the MDN spec, repeating the 'data' key 104 # According to the MDN spec, repeating the 'data' key
89 # represents a newline(!) 105 # represents a newline(!)
90 if key in event: 106 if key in event:
@@ -99,24 +115,30 @@ class StreamListener(object):
99 data = event['data'] 115 data = event['data']
100 payload = json.loads(data, object_hook = Mastodon._Mastodon__json_hooks) 116 payload = json.loads(data, object_hook = Mastodon._Mastodon__json_hooks)
101 except KeyError as err: 117 except KeyError as err:
102 six.raise_from( 118 exception = MastodonMalformedEventError('Missing field', err.args[0], event)
103 MastodonMalformedEventError('Missing field', err.args[0], event), 119 self.on_abort(exception)
104 err 120 six.raise_from(
105 ) 121 exception,
122 err
123 )
106 except ValueError as err: 124 except ValueError as err:
107 # py2: plain ValueError 125 # py2: plain ValueError
108 # py3: json.JSONDecodeError, a subclass of ValueError 126 # py3: json.JSONDecodeError, a subclass of ValueError
109 six.raise_from( 127 exception = MastodonMalformedEventError('Bad JSON', data)
110 MastodonMalformedEventError('Bad JSON', data), 128 self.on_abort(exception)
111 err 129 six.raise_from(
112 ) 130 exception,
131 err
132 )
113 133
114 handler_name = 'on_' + name 134 handler_name = 'on_' + name
115 try: 135 try:
116 handler = getattr(self, handler_name) 136 handler = getattr(self, handler_name)
117 except AttributeError as err: 137 except AttributeError as err:
138 exception = MastodonMalformedEventError('Bad event type', name)
139 self.on_abort(exception)
118 six.raise_from( 140 six.raise_from(
119 MastodonMalformedEventError('Bad event type', name), 141 exception,
120 err 142 err
121 ) 143 )
122 else: 144 else:
Powered by cgit v1.2.3 (git 2.41.0)