diff options
author | Chronister <[email protected]> | 2017-08-12 22:21:37 -0700 |
---|---|---|
committer | Chronister <[email protected]> | 2017-08-12 22:21:37 -0700 |
commit | a6a1ddbed1d490a5cf73c315c972ffabe94d8cee (patch) | |
tree | 7093a8a645c73263996ab51a7081a78d3ad9a13a | |
parent | fccc4e19866442c609ddddf9d112c6c14ab20d1e (diff) | |
download | mastodon.py-a6a1ddbed1d490a5cf73c315c972ffabe94d8cee.tar.gz |
Add async parameter to streaming API calls. If true, calls the streaming API on a separate thread and returns the Response object to the user so they can close it at their discretion.
-rw-r--r-- | mastodon/Mastodon.py | 90 |
1 files changed, 64 insertions, 26 deletions
diff --git a/mastodon/Mastodon.py b/mastodon/Mastodon.py index 8aa741a..819d252 100644 --- a/mastodon/Mastodon.py +++ b/mastodon/Mastodon.py | |||
@@ -16,6 +16,7 @@ import dateutil | |||
16 | import dateutil.parser | 16 | import dateutil.parser |
17 | import re | 17 | import re |
18 | import copy | 18 | import copy |
19 | import threading | ||
19 | 20 | ||
20 | class Mastodon: | 21 | class Mastodon: |
21 | """ | 22 | """ |
@@ -818,46 +819,63 @@ class Mastodon: | |||
818 | ### | 819 | ### |
819 | # Streaming | 820 | # Streaming |
820 | ### | 821 | ### |
821 | def user_stream(self, listener): | 822 | def user_stream(self, listener, async=False): |
822 | """ | 823 | """ |
823 | Streams events that are relevant to the authorized user, i.e. home | 824 | Streams events that are relevant to the authorized user, i.e. home |
824 | timeline and notifications. 'listener' should be a subclass of | 825 | timeline and notifications. 'listener' should be a subclass of |
825 | StreamListener. | 826 | StreamListener which will receive callbacks for incoming events. |
826 | 827 | ||
827 | This method blocks forever, calling callbacks on 'listener' for | 828 | If async is False, this method blocks forever. |
828 | incoming events. | 829 | |
830 | If async is True, 'listener' will listen on another thread and this method | ||
831 | will return a requests.Response instance corresponding to the open | ||
832 | connection. The connection may be closed at any time by calling its | ||
833 | close() method. | ||
829 | """ | 834 | """ |
830 | return self.__stream('/api/v1/streaming/user', listener) | 835 | return self.__stream('/api/v1/streaming/user', listener, async=async) |
831 | 836 | ||
832 | def public_stream(self, listener): | 837 | def public_stream(self, listener, async=False): |
833 | """ | 838 | """ |
834 | Streams public events. 'listener' should be a subclass of | 839 | Streams public events. 'listener' should be a subclass of StreamListener |
835 | StreamListener. | 840 | which will receive callbacks for incoming events. |
841 | |||
842 | If async is False, this method blocks forever. | ||
836 | 843 | ||
837 | This method blocks forever, calling callbacks on 'listener' for | 844 | If async is True, 'listener' will listen on another thread and this method |
838 | incoming events. | 845 | will return a requests.Response instance corresponding to the open |
846 | connection. The connection may be closed at any time by calling its | ||
847 | close() method. | ||
839 | """ | 848 | """ |
840 | return self.__stream('/api/v1/streaming/public', listener) | 849 | return self.__stream('/api/v1/streaming/public', listener, async=async) |
841 | 850 | ||
842 | def local_stream(self, listener): | 851 | def local_stream(self, listener, async=False): |
843 | """ | 852 | """ |
844 | Streams local events. 'listener' should be a subclass of | 853 | Streams local events. 'listener' should be a subclass of StreamListener |
845 | StreamListener. | 854 | which will receive callbacks for incoming events. |
855 | |||
856 | If async is False, this method blocks forever. | ||
846 | 857 | ||
847 | This method blocks forever, calling callbacks on 'listener' for | 858 | If async is True, 'listener' will listen on another thread and this method |
848 | incoming events. | 859 | will return a requests.Response instance corresponding to the open |
860 | connection. The connection may be closed at any time by calling its | ||
861 | close() method. | ||
849 | """ | 862 | """ |
850 | return self.__stream('/api/v1/streaming/public/local', listener) | 863 | return self.__stream('/api/v1/streaming/public/local', listener, async=async) |
851 | 864 | ||
852 | def hashtag_stream(self, tag, listener): | 865 | def hashtag_stream(self, tag, listener, async=False): |
853 | """ | 866 | """ |
854 | Returns all public statuses for the hashtag 'tag'. 'listener' should be | 867 | Returns all public statuses for the hashtag 'tag'. 'listener' should be |
855 | a subclass of StreamListener. | 868 | a subclass of StreamListener which will receive callbacks for incoming |
869 | events. | ||
856 | 870 | ||
857 | This method blocks forever, calling callbacks on 'listener' for | 871 | If async is False, this method blocks forever. |
858 | incoming events. | 872 | |
873 | If async is True, 'listener' will listen on another thread and this method | ||
874 | will return a requests.Response instance corresponding to the open | ||
875 | connection. The connection may be closed at any time by calling its | ||
876 | close() method. | ||
859 | """ | 877 | """ |
860 | return self.__stream('/api/v1/streaming/hashtag', listener, params={'tag': tag}) | 878 | return self.__stream('/api/v1/streaming/hashtag', listener, params={'tag': tag}, async=async) |
861 | 879 | ||
862 | ### | 880 | ### |
863 | # Internal helpers, dragons probably | 881 | # Internal helpers, dragons probably |
@@ -1017,19 +1035,39 @@ class Mastodon: | |||
1017 | 1035 | ||
1018 | return response | 1036 | return response |
1019 | 1037 | ||
1020 | def __stream(self, endpoint, listener, params = {}): | 1038 | |
1039 | def __stream(self, endpoint, listener, params = {}, async=False): | ||
1021 | """ | 1040 | """ |
1022 | Internal streaming API helper. | 1041 | Internal streaming API helper. |
1042 | |||
1043 | Returns the requests.Response instance corresponding to the open websocket | ||
1044 | connection. | ||
1023 | """ | 1045 | """ |
1024 | 1046 | ||
1025 | headers = {} | 1047 | headers = {} |
1026 | if self.access_token != None: | 1048 | if self.access_token != None: |
1027 | headers = {'Authorization': 'Bearer ' + self.access_token} | 1049 | headers = {'Authorization': 'Bearer ' + self.access_token} |
1028 | |||
1029 | url = self.api_base_url + endpoint | 1050 | url = self.api_base_url + endpoint |
1030 | with closing(requests.get(url, headers = headers, data = params, stream = True)) as r: | ||
1031 | listener.handle_stream(r.iter_lines()) | ||
1032 | 1051 | ||
1052 | connection = requests.get(url, headers = headers, data = params, stream = True) | ||
1053 | |||
1054 | def __stream_threadproc(): | ||
1055 | with closing(connection) as r: | ||
1056 | try: | ||
1057 | listener.handle_stream(r.iter_lines()) | ||
1058 | except AttributeError as e: | ||
1059 | # TODO If the user closes the connection early, requests gets | ||
1060 | # confused and throws an AttributeError | ||
1061 | pass | ||
1062 | return 0 | ||
1063 | |||
1064 | if async: | ||
1065 | t = threading.Thread(args=(), target=__stream_threadproc) | ||
1066 | t.start() | ||
1067 | return connection | ||
1068 | else: | ||
1069 | # Blocking, never returns (can only leave via exception) | ||
1070 | return __stream_threadproc() | ||
1033 | 1071 | ||
1034 | def __generate_params(self, params, exclude = []): | 1072 | def __generate_params(self, params, exclude = []): |
1035 | """ | 1073 | """ |