aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChronister <[email protected]>2017-08-12 22:21:37 -0700
committerChronister <[email protected]>2017-08-12 22:21:37 -0700
commita6a1ddbed1d490a5cf73c315c972ffabe94d8cee (patch)
tree7093a8a645c73263996ab51a7081a78d3ad9a13a
parentfccc4e19866442c609ddddf9d112c6c14ab20d1e (diff)
downloadmastodon.py-a6a1ddbed1d490a5cf73c315c972ffabe94d8cee.tar.gz
Add async parameter to streaming API calls. If true, calls the streaming API on a separate thread and returns the Response object to the user so they can close it at their discretion.
-rw-r--r--mastodon/Mastodon.py90
1 files changed, 64 insertions, 26 deletions
diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py
index 8aa741a..819d252 100644
--- a/mastodon/Mastodon.py
+++ b/mastodon/Mastodon.py
@@ -16,6 +16,7 @@ import dateutil
16import dateutil.parser 16import dateutil.parser
17import re 17import re
18import copy 18import copy
19import threading
19 20
20class Mastodon: 21class Mastodon:
21 """ 22 """
@@ -818,46 +819,63 @@ class Mastodon:
818 ### 819 ###
819 # Streaming 820 # Streaming
820 ### 821 ###
821 def user_stream(self, listener): 822 def user_stream(self, listener, async=False):
822 """ 823 """
823 Streams events that are relevant to the authorized user, i.e. home 824 Streams events that are relevant to the authorized user, i.e. home
824 timeline and notifications. 'listener' should be a subclass of 825 timeline and notifications. 'listener' should be a subclass of
825 StreamListener. 826 StreamListener which will receive callbacks for incoming events.
826 827
827 This method blocks forever, calling callbacks on 'listener' for 828 If async is False, this method blocks forever.
828 incoming events. 829
830 If async is True, 'listener' will listen on another thread and this method
831 will return a requests.Response instance corresponding to the open
832 connection. The connection may be closed at any time by calling its
833 close() method.
829 """ 834 """
830 return self.__stream('/api/v1/streaming/user', listener) 835 return self.__stream('/api/v1/streaming/user', listener, async=async)
831 836
832 def public_stream(self, listener): 837 def public_stream(self, listener, async=False):
833 """ 838 """
834 Streams public events. 'listener' should be a subclass of 839 Streams public events. 'listener' should be a subclass of StreamListener
835 StreamListener. 840 which will receive callbacks for incoming events.
841
842 If async is False, this method blocks forever.
836 843
837 This method blocks forever, calling callbacks on 'listener' for 844 If async is True, 'listener' will listen on another thread and this method
838 incoming events. 845 will return a requests.Response instance corresponding to the open
846 connection. The connection may be closed at any time by calling its
847 close() method.
839 """ 848 """
840 return self.__stream('/api/v1/streaming/public', listener) 849 return self.__stream('/api/v1/streaming/public', listener, async=async)
841 850
842 def local_stream(self, listener): 851 def local_stream(self, listener, async=False):
843 """ 852 """
844 Streams local events. 'listener' should be a subclass of 853 Streams local events. 'listener' should be a subclass of StreamListener
845 StreamListener. 854 which will receive callbacks for incoming events.
855
856 If async is False, this method blocks forever.
846 857
847 This method blocks forever, calling callbacks on 'listener' for 858 If async is True, 'listener' will listen on another thread and this method
848 incoming events. 859 will return a requests.Response instance corresponding to the open
860 connection. The connection may be closed at any time by calling its
861 close() method.
849 """ 862 """
850 return self.__stream('/api/v1/streaming/public/local', listener) 863 return self.__stream('/api/v1/streaming/public/local', listener, async=async)
851 864
852 def hashtag_stream(self, tag, listener): 865 def hashtag_stream(self, tag, listener, async=False):
853 """ 866 """
854 Returns all public statuses for the hashtag 'tag'. 'listener' should be 867 Returns all public statuses for the hashtag 'tag'. 'listener' should be
855 a subclass of StreamListener. 868 a subclass of StreamListener which will receive callbacks for incoming
869 events.
856 870
857 This method blocks forever, calling callbacks on 'listener' for 871 If async is False, this method blocks forever.
858 incoming events. 872
873 If async is True, 'listener' will listen on another thread and this method
874 will return a requests.Response instance corresponding to the open
875 connection. The connection may be closed at any time by calling its
876 close() method.
859 """ 877 """
860 return self.__stream('/api/v1/streaming/hashtag', listener, params={'tag': tag}) 878 return self.__stream('/api/v1/streaming/hashtag', listener, params={'tag': tag}, async=async)
861 879
862 ### 880 ###
863 # Internal helpers, dragons probably 881 # Internal helpers, dragons probably
@@ -1017,19 +1035,39 @@ class Mastodon:
1017 1035
1018 return response 1036 return response
1019 1037
1020 def __stream(self, endpoint, listener, params = {}): 1038
1039 def __stream(self, endpoint, listener, params = {}, async=False):
1021 """ 1040 """
1022 Internal streaming API helper. 1041 Internal streaming API helper.
1042
1043 Returns the requests.Response instance corresponding to the open websocket
1044 connection.
1023 """ 1045 """
1024 1046
1025 headers = {} 1047 headers = {}
1026 if self.access_token != None: 1048 if self.access_token != None:
1027 headers = {'Authorization': 'Bearer ' + self.access_token} 1049 headers = {'Authorization': 'Bearer ' + self.access_token}
1028
1029 url = self.api_base_url + endpoint 1050 url = self.api_base_url + endpoint
1030 with closing(requests.get(url, headers = headers, data = params, stream = True)) as r:
1031 listener.handle_stream(r.iter_lines())
1032 1051
1052 connection = requests.get(url, headers = headers, data = params, stream = True)
1053
1054 def __stream_threadproc():
1055 with closing(connection) as r:
1056 try:
1057 listener.handle_stream(r.iter_lines())
1058 except AttributeError as e:
1059 # TODO If the user closes the connection early, requests gets
1060 # confused and throws an AttributeError
1061 pass
1062 return 0
1063
1064 if async:
1065 t = threading.Thread(args=(), target=__stream_threadproc)
1066 t.start()
1067 return connection
1068 else:
1069 # Blocking, never returns (can only leave via exception)
1070 return __stream_threadproc()
1033 1071
1034 def __generate_params(self, params, exclude = []): 1072 def __generate_params(self, params, exclude = []):
1035 """ 1073 """
Powered by cgit v1.2.3 (git 2.41.0)