aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLorenz Diener <[email protected]>2018-04-17 16:06:00 +0200
committerGitHub <[email protected]>2018-04-17 16:06:00 +0200
commit2afc50c803177fc282fac1a19fcd40e175d55f2d (patch)
tree7ce50a932d74a3ad902b6b4b4613c18b33937bb5 /mastodon/Mastodon.py
parent06a7a875fe2705044b98f3ef285944b5513be7de (diff)
parent864c83fa2fe7e09125a24f4a12acb4b1e285bc02 (diff)
downloadmastodon.py-2afc50c803177fc282fac1a19fcd40e175d55f2d.tar.gz
Merge branch 'master' into stream-timeout
Diffstat (limited to 'mastodon/Mastodon.py')
-rw-r--r--mastodon/Mastodon.py96
1 files changed, 68 insertions, 28 deletions
diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py
index ea54649..5461661 100644
--- a/mastodon/Mastodon.py
+++ b/mastodon/Mastodon.py
@@ -90,6 +90,7 @@ class Mastodon:
90 __DEFAULT_BASE_URL = 'https://mastodon.social' 90 __DEFAULT_BASE_URL = 'https://mastodon.social'
91 __DEFAULT_TIMEOUT = 300 91 __DEFAULT_TIMEOUT = 300
92 __DEFAULT_STREAM_TIMEOUT = 300 92 __DEFAULT_STREAM_TIMEOUT = 300
93 __DEFAULT_STREAM_RECONNECT_WAIT_SEC = 5
93 __SUPPORTED_MASTODON_VERSION = "2.2.0" 94 __SUPPORTED_MASTODON_VERSION = "2.2.0"
94 95
95 ### 96 ###
@@ -1388,45 +1389,45 @@ class Mastodon:
1388 # Streaming 1389 # Streaming
1389 ### 1390 ###
1390 @api_version("1.1.0", "1.4.2") 1391 @api_version("1.1.0", "1.4.2")
1391 def stream_user(self, listener, async=False): 1392 def stream_user(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1392 """ 1393 """
1393 Streams events that are relevant to the authorized user, i.e. home 1394 Streams events that are relevant to the authorized user, i.e. home
1394 timeline and notifications. 1395 timeline and notifications.
1395 """ 1396 """
1396 return self.__stream('/api/v1/streaming/user', listener, async=async) 1397 return self.__stream('/api/v1/streaming/user', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1397 1398
1398 @api_version("1.1.0", "1.4.2") 1399 @api_version("1.1.0", "1.4.2")
1399 def stream_public(self, listener, async=False, timeout=__DEFAULT_STREAM_TIMEOUT): 1400 def stream_public(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1400 """ 1401 """
1401 Streams public events. 1402 Streams public events.
1402 """ 1403 """
1403 return self.__stream('/api/v1/streaming/public', listener, async=async, timeout=timeout) 1404 return self.__stream('/api/v1/streaming/public', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1404 1405
1405 @api_version("1.1.0", "1.4.2") 1406 @api_version("1.1.0", "1.4.2")
1406 def stream_local(self, listener, async=False, timeout=__DEFAULT_STREAM_TIMEOUT): 1407 def stream_local(self, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1407 """ 1408 """
1408 Streams local public events. 1409 Streams local public events.
1409 """ 1410 """
1410 return self.__stream('/api/v1/streaming/public/local', listener, async=async, timeout=timeout) 1411 return self.__stream('/api/v1/streaming/public/local', listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1411 1412
1412 @api_version("1.1.0", "1.4.2") 1413 @api_version("1.1.0", "1.4.2")
1413 def stream_hashtag(self, tag, listener, async=False, timeout=__DEFAULT_STREAM_TIMEOUT): 1414 def stream_hashtag(self, tag, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1414 """ 1415 """
1415 Stream for all public statuses for the hashtag 'tag' seen by the connected 1416 Stream for all public statuses for the hashtag 'tag' seen by the connected
1416 instance. 1417 instance.
1417 """ 1418 """
1418 if tag.startswith("#"): 1419 if tag.startswith("#"):
1419 raise MastodonIllegalArgumentError("Tag parameter should omit leading #") 1420 raise MastodonIllegalArgumentError("Tag parameter should omit leading #")
1420 return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, async=async, timeout=timeout) 1421 return self.__stream("/api/v1/streaming/hashtag?tag={}".format(tag), listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1421 1422
1422 @api_version("2.1.0", "2.1.0") 1423 @api_version("2.1.0", "2.1.0")
1423 def stream_list(self, id, listener, async=False, timeout=__DEFAULT_STREAM_TIMEOUT): 1424 def stream_list(self, id, listener, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1424 """ 1425 """
1425 Stream events for the current user, restricted to accounts on the given 1426 Stream events for the current user, restricted to accounts on the given
1426 list. 1427 list.
1427 """ 1428 """
1428 id = self.__unpack_id(id) 1429 id = self.__unpack_id(id)
1429 return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, async=async, timeout=timeout) 1430 return self.__stream("/api/v1/streaming/list?list={}".format(id), listener, run_async=run_async, timeout=timeout, reconnect_async=reconnect_async, reconnect_async_wait_sec=reconnect_async_wait_sec)
1430 1431
1431 ### 1432 ###
1432 # Internal helpers, dragons probably 1433 # Internal helpers, dragons probably
@@ -1668,7 +1669,7 @@ class Mastodon:
1668 1669
1669 return response 1670 return response
1670 1671
1671 def __stream(self, endpoint, listener, params={}, async=False, timeout=__DEFAULT_STREAM_TIMEOUT): 1672 def __stream(self, endpoint, listener, params={}, run_async=False, timeout=__DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=__DEFAULT_STREAM_RECONNECT_WAIT_SEC):
1672 """ 1673 """
1673 Internal streaming API helper. 1674 Internal streaming API helper.
1674 1675
@@ -1696,19 +1697,29 @@ class Mastodon:
1696 # The streaming server can't handle two slashes in a path, so remove trailing slashes 1697 # The streaming server can't handle two slashes in a path, so remove trailing slashes
1697 if url[-1] == '/': 1698 if url[-1] == '/':
1698 url = url[:-1] 1699 url = url[:-1]
1699 1700
1700 headers = {"Authorization": "Bearer " + self.access_token} 1701 # Connect function (called and then potentially passed to async handler)
1701 connection = requests.get(url + endpoint, headers = headers, data = params, stream = True, 1702 def connect_func():
1703 headers = {"Authorization": "Bearer " + self.access_token}
1704 connection = requests.get(url + endpoint, headers = headers, data = params, stream = True,
1702 timeout=(self.request_timeout, timeout)) 1705 timeout=(self.request_timeout, timeout))
1703 1706
1704 if connection.status_code != 200: 1707 if connection.status_code != 200:
1705 raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason) 1708 raise MastodonNetworkError("Could not connect to streaming server: %s" % connection.reason)
1706 1709 return connection
1710 connection = connect_func()
1711
1712 # Async stream handler
1707 class __stream_handle(): 1713 class __stream_handle():
1708 def __init__(self, connection): 1714 def __init__(self, connection, connect_func, reconnect_async, reconnect_async_wait_sec):
1709 self.closed = False 1715 self.closed = False
1716 self.running = True
1710 self.connection = connection 1717 self.connection = connection
1711 1718 self.connect_func = connect_func
1719 self.reconnect_async = reconnect_async
1720 self.reconnect_async_wait_sec = reconnect_async_wait_sec
1721 self.reconnecting = False
1722
1712 def close(self): 1723 def close(self):
1713 self.closed = True 1724 self.closed = True
1714 self.connection.close() 1725 self.connection.close()
@@ -1716,19 +1727,48 @@ class Mastodon:
1716 def is_alive(self): 1727 def is_alive(self):
1717 return self._thread.is_alive() 1728 return self._thread.is_alive()
1718 1729
1730 def is_receiving(self):
1731 if self.closed or not self.running or self.reconnecting or not self.is_alive():
1732 return False
1733 else:
1734 return True
1735
1719 def _threadproc(self): 1736 def _threadproc(self):
1720 self._thread = threading.current_thread() 1737 self._thread = threading.current_thread()
1721 with closing(connection) as r: 1738
1722 try: 1739 # Run until closed or until error if not autoreconnecting
1723 listener.handle_stream(r) 1740 while self.running:
1724 except AttributeError as e: 1741 with closing(self.connection) as r:
1725 if not self.closed: 1742 try:
1726 raise e 1743 listener.handle_stream(r)
1744 except (AttributeError, MastodonMalformedEventError, MastodonNetworkError) as e:
1745 if not (self.closed or self.reconnect_async):
1746 raise e
1747 else:
1748 if self.closed:
1749 self.running = False
1750
1751 # Reconnect loop. Try immediately once, then with delays on error.
1752 if self.reconnect_async and not self.closed:
1753 self.reconnecting = True
1754 connect_success = False
1755 while not connect_success:
1756 connect_success = True
1757 try:
1758 self.connection = self.connect_func()
1759 if self.connection.status_code != 200:
1760 time.sleep(self.reconnect_async_wait_sec)
1761 connect_success = False
1762 except:
1763 time.sleep(self.reconnect_async_wait_sec)
1764 connect_success = False
1765 self.reconnecting = False
1766 else:
1767 self.running = False
1727 return 0 1768 return 0
1728 1769
1729 handle = __stream_handle(connection) 1770 if run_async:
1730 1771 handle = __stream_handle(connection, connect_func, reconnect_async, reconnect_async_wait_sec)
1731 if async:
1732 t = threading.Thread(args=(), daemon = True, target=handle._threadproc) 1772 t = threading.Thread(args=(), daemon = True, target=handle._threadproc)
1733 t.start() 1773 t.start()
1734 return handle 1774 return handle
Powered by cgit v1.2.3 (git 2.41.0)