import requests from ..spec import AbstractFileSystem from ..utils import infer_storage_options from .memory import MemoryFile class GistFileSystem(AbstractFileSystem): """ Interface to files in a single GitHub Gist. Provides read-only access to a gist's files. Gists do not contain subdirectories, so file listing is straightforward. Parameters ---------- gist_id : str The ID of the gist you want to access (the long hex value from the URL). filenames : list[str] (optional) If provided, only make a file system representing these files, and do not fetch the list of all files for this gist. sha : str (optional) If provided, fetch a particular revision of the gist. If omitted, the latest revision is used. username : str (optional) GitHub username for authentication (required if token is given). token : str (optional) GitHub personal access token (required if username is given). timeout : (float, float) or float, optional Connect and read timeouts for requests (default 60s each). kwargs : dict Stored on `self.request_kw` and passed to `requests.get` when fetching Gist metadata or reading ("opening") a file. """ protocol = "gist" gist_url = "https://api.github.com/gists/{gist_id}" gist_rev_url = "https://api.github.com/gists/{gist_id}/{sha}" def __init__( self, gist_id, filenames=None, sha=None, username=None, token=None, timeout=None, **kwargs, ): super().__init__() self.gist_id = gist_id self.filenames = filenames self.sha = sha # revision of the gist (optional) if (username is None) ^ (token is None): # Both or neither must be set if username or token: raise ValueError("Auth requires both username and token, or neither.") self.username = username self.token = token self.request_kw = kwargs # Default timeouts to 60s connect/read if none provided self.timeout = timeout if timeout is not None else (60, 60) # We use a single-level "directory" cache, because a gist is essentially flat self.dircache[""] = self._fetch_file_list() @property def kw(self): """Auth parameters passed to 'requests' if we have username/token.""" if self.username is not None and self.token is not None: return {"auth": (self.username, self.token), **self.request_kw} return self.request_kw def _fetch_gist_metadata(self): """ Fetch the JSON metadata for this gist (possibly for a specific revision). """ if self.sha: url = self.gist_rev_url.format(gist_id=self.gist_id, sha=self.sha) else: url = self.gist_url.format(gist_id=self.gist_id) r = requests.get(url, timeout=self.timeout, **self.kw) if r.status_code == 404: raise FileNotFoundError( f"Gist not found: {self.gist_id}@{self.sha or 'latest'}" ) r.raise_for_status() return r.json() def _fetch_file_list(self): """ Returns a list of dicts describing each file in the gist. These get stored in self.dircache[""]. """ meta = self._fetch_gist_metadata() if self.filenames: available_files = meta.get("files", {}) files = {} for fn in self.filenames: if fn not in available_files: raise FileNotFoundError(fn) files[fn] = available_files[fn] else: files = meta.get("files", {}) out = [] for fname, finfo in files.items(): if finfo is None: # Occasionally GitHub returns a file entry with null if it was deleted continue # Build a directory entry out.append( { "name": fname, # file's name "type": "file", # gists have no subdirectories "size": finfo.get("size", 0), # file size in bytes "raw_url": finfo.get("raw_url"), } ) return out @classmethod def _strip_protocol(cls, path): """ Remove 'gist://' from the path, if present. """ # The default infer_storage_options can handle gist://username:token@id/file # or gist://id/file, but let's ensure we handle a normal usage too. # We'll just strip the protocol prefix if it exists. path = infer_storage_options(path).get("path", path) return path.lstrip("/") @staticmethod def _get_kwargs_from_urls(path): """ Parse 'gist://' style URLs into GistFileSystem constructor kwargs. For example: gist://:TOKEN@/file.txt gist://username:TOKEN@/file.txt """ so = infer_storage_options(path) out = {} if "username" in so and so["username"]: out["username"] = so["username"] if "password" in so and so["password"]: out["token"] = so["password"] if "host" in so and so["host"]: # We interpret 'host' as the gist ID out["gist_id"] = so["host"] # Extract SHA and filename from path if "path" in so and so["path"]: path_parts = so["path"].rsplit("/", 2)[-2:] if len(path_parts) == 2: if path_parts[0]: # SHA present out["sha"] = path_parts[0] if path_parts[1]: # filename also present out["filenames"] = [path_parts[1]] return out def ls(self, path="", detail=False, **kwargs): """ List files in the gist. Gists are single-level, so any 'path' is basically the filename, or empty for all files. Parameters ---------- path : str, optional The filename to list. If empty, returns all files in the gist. detail : bool, default False If True, return a list of dicts; if False, return a list of filenames. """ path = self._strip_protocol(path or "") # If path is empty, return all if path == "": results = self.dircache[""] else: # We want just the single file with this name all_files = self.dircache[""] results = [f for f in all_files if f["name"] == path] if not results: raise FileNotFoundError(path) if detail: return results else: return sorted(f["name"] for f in results) def _open(self, path, mode="rb", block_size=None, **kwargs): """ Read a single file from the gist. """ if mode != "rb": raise NotImplementedError("GitHub Gist FS is read-only (no write).") path = self._strip_protocol(path) # Find the file entry in our dircache matches = [f for f in self.dircache[""] if f["name"] == path] if not matches: raise FileNotFoundError(path) finfo = matches[0] raw_url = finfo.get("raw_url") if not raw_url: raise FileNotFoundError(f"No raw_url for file: {path}") r = requests.get(raw_url, timeout=self.timeout, **self.kw) if r.status_code == 404: raise FileNotFoundError(path) r.raise_for_status() return MemoryFile(path, None, r.content) def cat(self, path, recursive=False, on_error="raise", **kwargs): """ Return {path: contents} for the given file or files. If 'recursive' is True, and path is empty, returns all files in the gist. """ paths = self.expand_path(path, recursive=recursive) out = {} for p in paths: try: with self.open(p, "rb") as f: out[p] = f.read() except FileNotFoundError as e: if on_error == "raise": raise e elif on_error == "omit": pass # skip else: out[p] = e return out