diff options
-rw-r--r-- | docs/index.rst | 3 | ||||
-rw-r--r-- | mastodon/streaming.py | 60 |
2 files changed, 43 insertions, 20 deletions
diff --git a/docs/index.rst b/docs/index.rst index 56e0756..fc8c003 100644 --- a/docs/index.rst +++ b/docs/index.rst | |||
@@ -815,7 +815,8 @@ will return a handle corresponding to the open connection. If, in addition, `asy | |||
815 | the thread will attempt to reconnect to the streaming API if any errors are encountered, waiting | 815 | the thread will attempt to reconnect to the streaming API if any errors are encountered, waiting |
816 | `async_reconnect_wait_sec` seconds between reconnection attempts. Note that no effort is made | 816 | `async_reconnect_wait_sec` seconds between reconnection attempts. Note that no effort is made |
817 | to "catch up" - events created while the connection is broken will not be received. If you need to make | 817 | to "catch up" - events created while the connection is broken will not be received. If you need to make |
818 | sure to get absolutely all notifications / deletes / toots, you will have to do that manually. | 818 | sure to get absolutely all notifications / deletes / toots, you will have to do that manually, e.g. |
819 | using the `on_abort` handler to fill in events since the last received one and then reconnecting. | ||
819 | 820 | ||
820 | The connection may be closed at any time by calling the handles close() method. The | 821 | The connection may be closed at any time by calling the handles close() method. The |
821 | current status of the handler thread can be checked with the handles is_alive() function, | 822 | current status of the handler thread can be checked with the handles is_alive() function, |
diff --git a/mastodon/streaming.py b/mastodon/streaming.py index 8c3ec19..65ec30a 100644 --- a/mastodon/streaming.py +++ b/mastodon/streaming.py | |||
@@ -25,8 +25,15 @@ class StreamListener(object): | |||
25 | describing the notification.""" | 25 | describing the notification.""" |
26 | pass | 26 | pass |
27 | 27 | ||
28 | def on_abort(self): | 28 | def on_abort(self, err): |
29 | """There was a connection error or read timeout.""" | 29 | """There was a connection error, read timeout or other error fatal to |
30 | the streaming connection. The exception object about to be raised | ||
31 | is passed to this function for reference. | ||
32 | |||
33 | Note that the exception will be raised properly once you return from this | ||
34 | function, so if you are using this handler to reconnect, either never | ||
35 | return or start a thread and then catch and ignore the exception. | ||
36 | """ | ||
30 | pass | 37 | pass |
31 | 38 | ||
32 | def on_delete(self, status_id): | 39 | def on_delete(self, status_id): |
@@ -55,8 +62,10 @@ class StreamListener(object): | |||
55 | try: | 62 | try: |
56 | line = line_buffer.decode('utf-8') | 63 | line = line_buffer.decode('utf-8') |
57 | except UnicodeDecodeError as err: | 64 | except UnicodeDecodeError as err: |
65 | exception = MastodonMalformedEventError("Malformed UTF-8") | ||
66 | self.on_abort(exception) | ||
58 | six.raise_from( | 67 | six.raise_from( |
59 | MastodonMalformedEventError("Malformed UTF-8"), | 68 | exception, |
60 | err | 69 | err |
61 | ) | 70 | ) |
62 | if line == '': | 71 | if line == '': |
@@ -68,15 +77,17 @@ class StreamListener(object): | |||
68 | else: | 77 | else: |
69 | line_buffer.extend(chunk) | 78 | line_buffer.extend(chunk) |
70 | except ChunkedEncodingError as err: | 79 | except ChunkedEncodingError as err: |
71 | self.on_abort() | 80 | exception = MastodonNetworkError("Server ceased communication.") |
81 | self.on_abort(exception) | ||
72 | six.raise_from( | 82 | six.raise_from( |
73 | MastodonNetworkError("Server ceased communication."), | 83 | exception, |
74 | err | 84 | err |
75 | ) | 85 | ) |
76 | except MastodonReadTimeout as err: | 86 | except MastodonReadTimeout as err: |
77 | self.on_abort() | 87 | exception = MastodonReadTimeout("Timed out while reading from server."), |
88 | self.on_abort(exception) | ||
78 | six.raise_from( | 89 | six.raise_from( |
79 | MastodonReadTimeout("Timed out while reading from server."), | 90 | exception, |
80 | err | 91 | err |
81 | ) | 92 | ) |
82 | 93 | ||
@@ -84,7 +95,12 @@ class StreamListener(object): | |||
84 | if line.startswith(':'): | 95 | if line.startswith(':'): |
85 | self.handle_heartbeat() | 96 | self.handle_heartbeat() |
86 | else: | 97 | else: |
87 | key, value = line.split(': ', 1) | 98 | try: |
99 | key, value = line.split(': ', 1) | ||
100 | except: | ||
101 | exception = MastodonMalformedEventError("Malformed event.") | ||
102 | self.on_abort(exception) | ||
103 | raise exception | ||
88 | # According to the MDN spec, repeating the 'data' key | 104 | # According to the MDN spec, repeating the 'data' key |
89 | # represents a newline(!) | 105 | # represents a newline(!) |
90 | if key in event: | 106 | if key in event: |
@@ -99,24 +115,30 @@ class StreamListener(object): | |||
99 | data = event['data'] | 115 | data = event['data'] |
100 | payload = json.loads(data, object_hook = Mastodon._Mastodon__json_hooks) | 116 | payload = json.loads(data, object_hook = Mastodon._Mastodon__json_hooks) |
101 | except KeyError as err: | 117 | except KeyError as err: |
102 | six.raise_from( | 118 | exception = MastodonMalformedEventError('Missing field', err.args[0], event) |
103 | MastodonMalformedEventError('Missing field', err.args[0], event), | 119 | self.on_abort(exception) |
104 | err | 120 | six.raise_from( |
105 | ) | 121 | exception, |
122 | err | ||
123 | ) | ||
106 | except ValueError as err: | 124 | except ValueError as err: |
107 | # py2: plain ValueError | 125 | # py2: plain ValueError |
108 | # py3: json.JSONDecodeError, a subclass of ValueError | 126 | # py3: json.JSONDecodeError, a subclass of ValueError |
109 | six.raise_from( | 127 | exception = MastodonMalformedEventError('Bad JSON', data) |
110 | MastodonMalformedEventError('Bad JSON', data), | 128 | self.on_abort(exception) |
111 | err | 129 | six.raise_from( |
112 | ) | 130 | exception, |
131 | err | ||
132 | ) | ||
113 | 133 | ||
114 | handler_name = 'on_' + name | 134 | handler_name = 'on_' + name |
115 | try: | 135 | try: |
116 | handler = getattr(self, handler_name) | 136 | handler = getattr(self, handler_name) |
117 | except AttributeError as err: | 137 | except AttributeError as err: |
138 | exception = MastodonMalformedEventError('Bad event type', name) | ||
139 | self.on_abort(exception) | ||
118 | six.raise_from( | 140 | six.raise_from( |
119 | MastodonMalformedEventError('Bad event type', name), | 141 | exception, |
120 | err | 142 | err |
121 | ) | 143 | ) |
122 | else: | 144 | else: |