mirror of
				https://github.com/HorlogeSkynet/SSHubl.git
				synced 2025-10-31 08:00:15 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			493 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			493 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import contextlib
 | |
| import logging
 | |
| import os
 | |
| import platform
 | |
| import shlex
 | |
| import shutil
 | |
| import subprocess
 | |
| import typing
 | |
| import uuid
 | |
| from pathlib import Path, PurePath, PureWindowsPath
 | |
| 
 | |
| try:
 | |
|     from pexpect import pxssh
 | |
| except ImportError:
 | |
|     IS_NONINTERACTIVE_SUPPORTED = False
 | |
| else:
 | |
|     IS_NONINTERACTIVE_SUPPORTED = True
 | |
| import sublime
 | |
| 
 | |
| from .paths import mounts_path, sockets_path
 | |
| from .st_utils import (
 | |
|     format_ip_addr,
 | |
|     get_absolute_purepath_flavour,
 | |
|     is_terminus_installed,
 | |
|     parse_ssh_connection,
 | |
|     pre_parse_forward_target,
 | |
| )
 | |
| from .vendor import mslex
 | |
| 
 | |
| 
 | |
| if platform.system() != "Windows":
 | |
|     platformlex = shlex
 | |
| else:
 | |
|     platformlex = mslex  # type: ignore[misc]
 | |
| 
 | |
| 
 | |
| _logger = logging.getLogger(__package__)
 | |
| 
 | |
| 
 | |
| def _settings():
 | |
|     return sublime.load_settings("SSHubl.sublime-settings")
 | |
| 
 | |
| 
 | |
| # We double-quote OpenSSH option values to properly deal with white-spaces. From our tests, this is
 | |
| # the only way to correctly deal with UNIX **and** Windows once escaping has been done.
 | |
| OPENSSH_OPTION = '-o{0}="{1}"'
 | |
| 
 | |
| ssh_program = _settings().get("ssh_path") or shutil.which("ssh")
 | |
| sshfs_program = _settings().get("sshfs_path") or shutil.which("sshfs")
 | |
| umount_program = _settings().get("umount_path")
 | |
| if platform.system() == "Linux":
 | |
|     umount_program = umount_program or shutil.which("fusermount")
 | |
|     umount_flags: typing.Tuple[str, ...] = ("-q", "-u")
 | |
| else:
 | |
|     umount_program = umount_program or shutil.which("umount")
 | |
|     if platform.system() != "Darwin":
 | |
|         umount_flags = ("-q",)
 | |
|     else:
 | |
|         umount_flags = ()
 | |
| 
 | |
| 
 | |
| class PasswordlessConnectionException(Exception):
 | |
|     """Custom exception raised when password-less authentication failed against server"""
 | |
| 
 | |
| 
 | |
| def get_base_ssh_cmd(
 | |
|     identifier: uuid.UUID,
 | |
|     args: typing.Tuple[str, ...] = tuple(),
 | |
|     add_fake_destination: bool = True,
 | |
|     program_path: typing.Optional[str] = ssh_program,
 | |
| ) -> typing.List[str]:
 | |
|     if program_path is None:
 | |
|         raise RuntimeError(f"{program_path} has not been found !")
 | |
| 
 | |
|     base_ssh_cmd = [
 | |
|         program_path,
 | |
|         OPENSSH_OPTION.format("ControlPath", sockets_path / str(identifier)),
 | |
|         # Prevent connection to fake 'destination" if control master is unavailable (inspired by
 | |
|         # <https://serverfault.com/a/914779>)
 | |
|         OPENSSH_OPTION.format("ProxyCommand", "exit 1"),
 | |
|         *args,
 | |
|     ]
 | |
| 
 | |
|     # OpenSSH CLI requires a 'destination' argument, even when connecting to a master socket
 | |
|     if add_fake_destination:
 | |
|         base_ssh_cmd.append("destination")
 | |
| 
 | |
|     return base_ssh_cmd
 | |
| 
 | |
| 
 | |
| def get_ssh_master_options(identifier: uuid.UUID) -> dict:
 | |
|     return {
 | |
|         **_settings().get("ssh_options", {}),
 | |
|         # enforce keep-alive for future sshfs usages (see upstream recommendations)
 | |
|         "ServerAliveInterval": str(_settings().get("ssh_server_alive_interval", 15)),
 | |
|         "ControlMaster": "auto",
 | |
|         "ControlPath": str(sockets_path / str(identifier)),
 | |
|         # keep connection opened for 1 minute (without new connection to control socket)
 | |
|         "ControlPersist": "60",
 | |
|     }
 | |
| 
 | |
| 
 | |
| def ssh_connect(
 | |
|     host: str,
 | |
|     port: int,
 | |
|     login: str,
 | |
|     password: typing.Optional[str] = None,
 | |
|     identifier: typing.Optional[uuid.UUID] = None,
 | |
| ) -> typing.Optional[uuid.UUID]:
 | |
|     """
 | |
|     This function connects to host using given credentials (if any) non-interactively using pexpect.
 | |
|     Connection is made using OpenSSH client, and a control master UNIX socket will be opened to
 | |
|     allow future channels multiplexing.
 | |
|     If `identifier` UUID is unset, one will be generated.
 | |
| 
 | |
|     :returns uuid.UUID: session identifier on success (or `None` on error)
 | |
|     :raises PasswordlessConnectionException: when connection failed due to authentication **and** no
 | |
|                                              password was given
 | |
|     """
 | |
|     if ssh_program is None:
 | |
|         raise RuntimeError(f"{ssh_program} has not been found !")
 | |
|     if not IS_NONINTERACTIVE_SUPPORTED:
 | |
|         raise RuntimeError("Non-interactive connection isn't supported !")
 | |
| 
 | |
|     if identifier is None:
 | |
|         identifier = uuid.uuid4()
 | |
| 
 | |
|     # run OpenSSH using pexpect to setup connection and non-interactively deal with prompts
 | |
|     ssh = pxssh.pxssh(options=get_ssh_master_options(identifier))
 | |
| 
 | |
|     # if a password has been given, force password authentication
 | |
|     if password is not None:
 | |
|         ssh.force_password = True
 | |
| 
 | |
|     try:
 | |
|         ssh.login(
 | |
|             host,
 | |
|             login,
 | |
|             password or "",
 | |
|             login_timeout=_settings().get("ssh_login_timeout", 10),
 | |
|             port=port,
 | |
|             auto_prompt_reset=False,
 | |
|             cmd=ssh_program,
 | |
|             # allow user to disable host authentication for loopback addresses
 | |
|             check_local_ip=_settings().get("ssh_host_authentication_for_localhost", True),
 | |
|         )
 | |
|     except pxssh.ExceptionPxssh as exception:
 | |
|         # if authentication failed without password, raise a specific exception
 | |
|         if password is None and str(exception) in ("password refused", "permission denied"):
 | |
|             _logger.debug("connection without password failed : %s", str(exception))
 | |
|             raise PasswordlessConnectionException from None
 | |
| 
 | |
|         _logger.error("ssh connection failed : %s", exception)
 | |
|         return None
 | |
| 
 | |
|     return identifier
 | |
| 
 | |
| 
 | |
| def ssh_connect_interactive(
 | |
|     connection_str: str,
 | |
|     identifier: typing.Optional[uuid.UUID] = None,
 | |
|     mounts: typing.Optional[typing.Dict[str, str]] = None,
 | |
|     forwards: typing.Optional[typing.List[dict]] = None,
 | |
|     window: typing.Optional[sublime.Window] = None,
 | |
| ) -> None:
 | |
|     if ssh_program is None:
 | |
|         raise RuntimeError(f"{ssh_program} has not been found !")
 | |
| 
 | |
|     if not is_terminus_installed():
 | |
|         sublime.error_message("Please install Terminus package to connect interactively !")
 | |
|         return
 | |
| 
 | |
|     if window is None:
 | |
|         window = sublime.active_window()
 | |
| 
 | |
|     if identifier is None:
 | |
|         identifier = uuid.uuid4()
 | |
| 
 | |
|     ssh_options = get_ssh_master_options(identifier)
 | |
|     if not _settings().get("ssh_host_authentication_for_localhost", True):
 | |
|         ssh_options["NoHostAuthenticationForLocalhost"] = "yes"
 | |
| 
 | |
|     host, port, login, _ = parse_ssh_connection(connection_str)
 | |
| 
 | |
|     terminus_open_args: typing.Dict[str, typing.Any] = {
 | |
|         "shell_cmd": platformlex.join(
 | |
|             (
 | |
|                 ssh_program,
 | |
|                 f"-l{login}",
 | |
|                 f"-p{port}",
 | |
|                 *[OPENSSH_OPTION.format(key, value) for key, value in ssh_options.items()],
 | |
|                 host,
 | |
|             )
 | |
|         ),
 | |
|         "title": f"{login}@{format_ip_addr(host)}:{port}",
 | |
|         "auto_close": "on_success",
 | |
|         "post_view_hooks": [
 | |
|             # makes Terminus executes a command which will wait for SSH connection to actually
 | |
|             # succeed before storing session in project data
 | |
|             (
 | |
|                 "ssh_interactive_connection_watcher",
 | |
|                 {
 | |
|                     "identifier": str(identifier),
 | |
|                     "connection_str": connection_str,
 | |
|                     "mounts": mounts,
 | |
|                     "forwards": forwards,
 | |
|                 },
 | |
|             ),
 | |
|         ],
 | |
|     }
 | |
| 
 | |
|     # Development note : please see `SshTerminalCommand` own documentation for below block rationale
 | |
|     if not _settings().get("honor_spell_check"):
 | |
|         terminus_open_args["view_settings"] = {
 | |
|             "spell_check": False,
 | |
|         }
 | |
| 
 | |
|     window.run_command("terminus_open", terminus_open_args)
 | |
| 
 | |
| 
 | |
| def ssh_disconnect(identifier: uuid.UUID) -> None:
 | |
|     """
 | |
|     Kill a SSH connection master, causing session graceful disconnection.
 | |
| 
 | |
|     Opened forwards are automatically closed, but sshfs mounts **ARE NOT**, please call
 | |
|     `umount_sshfs` **beforehand**.
 | |
|     """
 | |
| 
 | |
|     # delete base mounts path (in a best effort manner) to keep `mounts_path` as clean as possible
 | |
|     with contextlib.suppress(OSError):
 | |
|         (mounts_path / str(identifier)).rmdir()
 | |
| 
 | |
|     _logger.debug("killing %s master...", identifier)
 | |
| 
 | |
|     try:
 | |
|         subprocess.check_call(
 | |
|             get_base_ssh_cmd(identifier, ("-O", "exit")),
 | |
|             stderr=subprocess.PIPE,
 | |
|             text=True,
 | |
|         )
 | |
|     except subprocess.CalledProcessError as error:
 | |
|         # if this fails, we assume session is somehow already down
 | |
|         _logger.warning("could not request master to exit : %s", (error.stderr or "Unknown error"))
 | |
| 
 | |
| 
 | |
| def mount_sshfs(
 | |
|     identifier: uuid.UUID, remote_path: PurePath, mount_path: typing.Optional[Path] = None
 | |
| ) -> typing.Optional[Path]:
 | |
|     """
 | |
|     Mount `remote_path` from `identifier` session using sshfs.
 | |
|     When `mount_path` is None, an unique mount path relative to session will be generated.
 | |
| 
 | |
|     Some options are passed to sshfs in order to :
 | |
|         * enable (local) UNIX permissions check
 | |
|         * follow remote symbolic links (if not disabled)
 | |
|         * map remote user UID/GID to local user
 | |
| 
 | |
|     :returns Path: local mount path on success , or `None` on error
 | |
|     """
 | |
|     if mount_path is None:
 | |
|         mount_path = mounts_path / str(identifier) / f"{remote_path.name}_{uuid.uuid4()}"
 | |
|     mount_path.mkdir(parents=True, exist_ok=True)
 | |
| 
 | |
|     sshfs_arguments = _settings().get("sshfs_arguments", []).copy()
 | |
|     if not sshfs_arguments:
 | |
|         if platform.system() == "Darwin":
 | |
|             # See <https://github.com/macfuse/macfuse/wiki/Mount-Options> and
 | |
|             # <https://github.com/macfuse/macfuse/issues/519> for implied Finder limitations.
 | |
|             sshfs_arguments = [
 | |
|                 "-ojail_symlinks",  # prefix absolute symbolic links by mount point
 | |
|                 "-onoappledouble",  # deny access to Apple Double and .DS_Store files
 | |
|                 "-onoapplexattr",  # deny access to xattr beginning with "com.apple." prefix
 | |
|             ]
 | |
| 
 | |
|     # follow symbolic links by default
 | |
|     # /!\ Symlink loops are known to break Sublime project folders scanning (see #54).
 | |
|     if _settings().get("sshfs_mount_follow_symlinks", True):
 | |
|         sshfs_arguments.append("-ofollow_symlinks")
 | |
| 
 | |
|     try:
 | |
|         subprocess.check_call(
 | |
|             get_base_ssh_cmd(
 | |
|                 identifier,
 | |
|                 (
 | |
|                     # user supplied arguments (and optionally "follow_symlinks")
 | |
|                     *sshfs_arguments,
 | |
|                     # enable local permissions check
 | |
|                     "-odefault_permissions",
 | |
|                     # map remote user UID/GID to current user
 | |
|                     "-oidmap=user",
 | |
|                     f"-ouid={os.getuid()}",
 | |
|                     f"-ogid={os.getgid()}",
 | |
|                     # fake 'destination' (see `get_base_ssh_cmd` for rationale)
 | |
|                     f"destination:{remote_path}",
 | |
|                     str(mount_path),
 | |
|                 ),
 | |
|                 add_fake_destination=False,
 | |
|                 program_path=sshfs_program,
 | |
|             ),
 | |
|             stderr=subprocess.PIPE,
 | |
|             text=True,
 | |
|         )
 | |
|     except subprocess.CalledProcessError as error:
 | |
|         _logger.error(
 | |
|             "could not mount %s over sshfs : %s",
 | |
|             remote_path,
 | |
|             (error.stderr or "Unknown error").rstrip(),
 | |
|         )
 | |
| 
 | |
|         # delete prepared directory as mounting operation failed
 | |
|         with contextlib.suppress(FileNotFoundError):
 | |
|             mount_path.rmdir()
 | |
| 
 | |
|         return None
 | |
| 
 | |
|     return mount_path
 | |
| 
 | |
| 
 | |
| def umount_sshfs(mount_path: Path) -> bool:
 | |
|     """
 | |
|     Unmount `mount_path`, considering it has been mounted using sshfs.
 | |
| 
 | |
|     :returns bool: `True` on success and `False` on error
 | |
|     """
 | |
|     if umount_program is None:
 | |
|         _logger.warning(
 | |
|             "%s has not been found, skipping unmounting of %s...", umount_program, mount_path
 | |
|         )
 | |
|         return False
 | |
| 
 | |
|     _logger.debug("unmounting %s...", mount_path)
 | |
| 
 | |
|     try:
 | |
|         subprocess.check_call(
 | |
|             [umount_program, *umount_flags, str(mount_path)],
 | |
|             stderr=subprocess.PIPE,
 | |
|             text=True,
 | |
|         )
 | |
|     except subprocess.CalledProcessError as error:
 | |
|         _logger.warning(
 | |
|             "could not unmount %s : %s", mount_path, (error.stderr or "Unknown error").rstrip()
 | |
|         )
 | |
|         return False
 | |
| 
 | |
|     _logger.debug("successfully unmounted %s, removing mount point...", mount_path)
 | |
| 
 | |
|     with contextlib.suppress(FileNotFoundError):
 | |
|         mount_path.rmdir()
 | |
| 
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def _remove_unix_domain_socket(
 | |
|     identifier: uuid.UUID, socket_path: str, *, is_reverse: bool
 | |
| ) -> None:
 | |
|     if not is_reverse:
 | |
|         Path(socket_path).unlink(missing_ok=True)
 | |
|         return
 | |
| 
 | |
|     remove_socket_cmd: typing.Tuple[str, ...]
 | |
|     if isinstance(get_absolute_purepath_flavour(socket_path), PureWindowsPath):
 | |
|         remove_socket_cmd = ("del", "/q", mslex.quote(socket_path))
 | |
|     else:
 | |
|         remove_socket_cmd = ("rm", "-f", "--", shlex.quote(socket_path))
 | |
| 
 | |
|     if ssh_exec(identifier, remove_socket_cmd) is None:
 | |
|         _logger.warning("couldn't remove remote UNIX domain socket : %s", socket_path)
 | |
| 
 | |
| 
 | |
| def ssh_forward(
 | |
|     identifier: uuid.UUID, do_open: bool, is_reverse: bool, target_1: str, target_2: str
 | |
| ) -> typing.Optional[dict]:
 | |
|     """
 | |
|     Open/Close (reverse) port/UNIX domain socket forwarding, and return a dict uniquely identifying
 | |
|     it on success.
 | |
|     If an error occurs, it is logged and `None` is returned.
 | |
|     """
 | |
|     if is_reverse:
 | |
|         target_local, target_remote = target_2, target_1
 | |
|     else:
 | |
|         target_local, target_remote = target_1, target_2
 | |
| 
 | |
|     try:
 | |
|         stdout = subprocess.check_output(
 | |
|             get_base_ssh_cmd(
 | |
|                 identifier,
 | |
|                 (
 | |
|                     "-O",
 | |
|                     "forward" if do_open else "cancel",
 | |
|                     "-R" if is_reverse else "-L",
 | |
|                     f"{target_1}:{target_2}",
 | |
|                 ),
 | |
|             ),
 | |
|             stderr=subprocess.PIPE,
 | |
|             text=True,
 | |
|         )
 | |
|     except subprocess.CalledProcessError as error:
 | |
|         _logger.error(
 | |
|             "could not %s forward %s %s %s : %s",
 | |
|             "open" if do_open else "close",
 | |
|             target_local,
 | |
|             "<-" if is_reverse else "->",
 | |
|             target_remote,
 | |
|             (error.stderr or "Unknown error").rstrip(),
 | |
|         )
 | |
|         return None
 | |
| 
 | |
|     # if port is expected to be dynamically-allocated by remote, update `target_remote` to allow
 | |
|     # future forward requests to re-use the same port
 | |
|     target_1_host, target_1_port_str = pre_parse_forward_target(target_1)
 | |
|     try:
 | |
|         target_1_port = int(target_1_port_str or "")
 | |
|     except ValueError:
 | |
|         target_1_port = None
 | |
|     if do_open and is_reverse and target_1_port == 0:
 | |
|         try:
 | |
|             remote_port = int(stdout)
 | |
|         except ValueError:
 | |
|             _logger.warning("could not retrieve port allocated by remote from : %s", stdout)
 | |
|         else:
 | |
|             _logger.debug(
 | |
|                 "remote successfully allocated %d for reverse forward to %s",
 | |
|                 remote_port,
 | |
|                 target_local,
 | |
|             )
 | |
|             target_remote = f"{target_1_host}:{remote_port}"
 | |
| 
 | |
|     # when closing an UNIX domain socket forward, also remove socket from disk to allow future
 | |
|     # forward requests to re-use the same path
 | |
|     if not do_open and target_1_port is None:
 | |
|         _remove_unix_domain_socket(identifier, target_1, is_reverse=is_reverse)
 | |
| 
 | |
|     _logger.debug(
 | |
|         "successfully %s forward %s %s %s",
 | |
|         "opened" if do_open else "closed",
 | |
|         target_local,
 | |
|         "<-" if is_reverse else "->",
 | |
|         target_remote,
 | |
|     )
 | |
|     return {
 | |
|         "is_reverse": is_reverse,
 | |
|         "orig_target_1": target_1,
 | |
|         "orig_target_2": target_2,
 | |
|         "target_local": target_local,
 | |
|         "target_remote": target_remote,
 | |
|     }
 | |
| 
 | |
| 
 | |
| def ssh_exec(identifier: uuid.UUID, args: typing.Iterable[str]) -> typing.Optional[str]:
 | |
|     """
 | |
|     Execute `args` command remotely using a non-interactive pseudo-TTY.
 | |
|     `args` arguments **ARE NOT** escaped.
 | |
|     """
 | |
|     try:
 | |
|         stdout = subprocess.check_output(
 | |
|             [
 | |
|                 *get_base_ssh_cmd(
 | |
|                     identifier,
 | |
|                     # force PTY allocation as we may execute a command that requires one
 | |
|                     ("-q", "-tt"),
 | |
|                 ),
 | |
|                 "--",
 | |
|                 # pass remote command to OpenSSH as an unique argument
 | |
|                 " ".join(args),
 | |
|             ],
 | |
|             stderr=subprocess.PIPE,
 | |
|             text=True,
 | |
|         )
 | |
|     except subprocess.CalledProcessError as error:
 | |
|         _logger.debug(
 | |
|             "executing %s on remote failed with %d : %s",
 | |
|             list(args),
 | |
|             error.returncode,
 | |
|             (error.stderr or "Unknown error").rstrip(),
 | |
|         )
 | |
|         return None
 | |
| 
 | |
|     _logger.debug("successfully executed %s remotely : stdout=%s", args, stdout.rstrip())
 | |
|     return stdout
 | |
| 
 | |
| 
 | |
| def ssh_check_master(identifier: uuid.UUID) -> bool:
 | |
|     try:
 | |
|         subprocess.check_call(
 | |
|             get_base_ssh_cmd(identifier, ("-O", "check")),
 | |
|             stdout=subprocess.DEVNULL,
 | |
|             stderr=subprocess.DEVNULL,
 | |
|         )
 | |
|     except subprocess.CalledProcessError:
 | |
|         return False
 | |
| 
 | |
|     return True
 |