mirror of
https://github.com/HorlogeSkynet/SSHubl.git
synced 2025-03-27 04:00:13 +01:00
197 lines
6.2 KiB
Python
197 lines
6.2 KiB
Python
import contextlib
|
|
import functools
|
|
import getpass
|
|
import ipaddress
|
|
import re
|
|
import typing
|
|
from pathlib import PurePath, PurePosixPath, PureWindowsPath
|
|
from urllib.parse import urlparse
|
|
|
|
import sublime_plugin
|
|
|
|
# hostname regular expression (taken from <https://stackoverflow.com/a/106223>)
|
|
HOSTNAME_REGEXP = re.compile(
|
|
r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$" # pylint: disable=line-too-long
|
|
)
|
|
|
|
OPENSSH_SPECIAL_BIND_ADDRESSES = ("localhost", "*")
|
|
|
|
|
|
def conditional_cache(no_cache_result: typing.Optional[tuple] = None):
|
|
"""A conditional cache wrapper, taken from <https://stackoverflow.com/a/68665480>"""
|
|
if no_cache_result is None:
|
|
no_cache_result = tuple()
|
|
|
|
cache: typing.Dict[typing.Tuple[typing.Any, ...], typing.Any] = {}
|
|
|
|
def decorator(func):
|
|
@functools.wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
_kwargs = tuple(kwargs.items())
|
|
if (args, _kwargs) in cache:
|
|
return cache[(args, _kwargs)]
|
|
|
|
res = func(*args, **kwargs)
|
|
if res not in no_cache_result:
|
|
cache[(args, _kwargs)] = res
|
|
|
|
return res
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
@conditional_cache(no_cache_result=(None,))
|
|
def get_command_class(class_name: str) -> typing.Optional[typing.Type[sublime_plugin.Command]]:
|
|
"""
|
|
This function does its best to check whether a command is known to Sublime.
|
|
If it has been found, the command class is returned. `None` is returned otherwise.
|
|
|
|
Once a command has been found, result is cached to prevent unnecessary additional lookups.
|
|
At the moment, only Sublime "window commands" are processed.
|
|
"""
|
|
for command_class in sublime_plugin.window_command_classes:
|
|
if command_class.__name__ == class_name:
|
|
return command_class
|
|
|
|
return None
|
|
|
|
|
|
def is_terminus_installed() -> bool:
|
|
return get_command_class("TerminusOpenCommand") is not None
|
|
|
|
|
|
@functools.lru_cache()
|
|
def format_ip_addr(host: str) -> str:
|
|
"""
|
|
This function "encloses" `host` with square brackets if it corresponds to an IPv6 address.
|
|
It also prefers IPv6 addresses "compressed" form, to shorten host strings in display.
|
|
"""
|
|
with contextlib.suppress(ValueError):
|
|
ip_address = ipaddress.ip_address(host)
|
|
if ip_address.version == 6:
|
|
return f"[{ip_address.compressed}]"
|
|
|
|
return host
|
|
|
|
|
|
@functools.lru_cache()
|
|
def get_absolute_purepath_flavour(path: str) -> typing.Optional[PurePath]:
|
|
"""
|
|
Return absolute `path` as adequate `PurePath` flavour instance object.
|
|
`None` is returned when `path` is relative (or when not considered absolute in any flavour).
|
|
"""
|
|
purepath: PurePath
|
|
|
|
purepath = PureWindowsPath(path)
|
|
if purepath.is_absolute():
|
|
return purepath
|
|
|
|
purepath = PurePosixPath(path)
|
|
if purepath.is_absolute():
|
|
return purepath
|
|
|
|
return None
|
|
|
|
|
|
def parse_ssh_connection(connection_str: str) -> typing.Tuple[str, int, str, typing.Optional[str]]:
|
|
"""
|
|
Return a `(host, port, login, password)` tuple from an SSHubl connection string.
|
|
Port defaults to 22 when missing from network location URL part.
|
|
Login defaults to current session username.
|
|
Password will be `None` when it's missing from connection string.
|
|
|
|
:raises ValueError: when connection string could not be parsed
|
|
"""
|
|
parse_result = urlparse(f"ssh://{connection_str}")
|
|
return (
|
|
parse_result.hostname or "",
|
|
parse_result.port or 22,
|
|
parse_result.username or getpass.getuser(),
|
|
parse_result.password,
|
|
)
|
|
|
|
|
|
def pre_parse_forward_target(forward_str: str) -> typing.Tuple[str, typing.Optional[str]]:
|
|
"""
|
|
(pre-)Parse OpenSSH client forward target string.
|
|
|
|
This function returns tuple separating "host" and "port", for BSD socket address.
|
|
For UNIX domain socket paths, "port" tuple part will be `None`.
|
|
|
|
IPv6 address enclosures are removed from "host" tuple part.
|
|
"""
|
|
parts = forward_str.rsplit(":", maxsplit=1)
|
|
|
|
try:
|
|
host, port = parts
|
|
except ValueError:
|
|
# only one part, `forward_str` could be either a port or an UNIX domain socket path
|
|
host, port = forward_str, None
|
|
else:
|
|
# remove square brackets from host (if any)
|
|
if host.startswith("[") and host.endswith("]"):
|
|
host = host[1:-1]
|
|
|
|
return host, port
|
|
|
|
|
|
@functools.lru_cache()
|
|
def pretty_forward_target(forward_str: str) -> str:
|
|
"""
|
|
Pretty format OpenSSH client forward target string.
|
|
|
|
The following rules are applied :
|
|
* do not alter UNIX domain socket paths
|
|
* do not alter `host:port` BSD socket addresses (when host is a domain name)
|
|
* hide `host` from `host:port` BSD socket addresses, when it's "unspecified" (in RFC terms)
|
|
* hide `host` from `host:port` BSD socket addresses, when it's "loopback" (in RFC terms)
|
|
* hide "OpenSSH special bind addresses" from BSD socket addresses
|
|
"""
|
|
host, port = pre_parse_forward_target(forward_str)
|
|
if port is None:
|
|
return host
|
|
|
|
try:
|
|
target_ip_address = ipaddress.ip_address(host)
|
|
except ValueError:
|
|
# not an IP address, return only port if host matches known values
|
|
if host in OPENSSH_SPECIAL_BIND_ADDRESSES:
|
|
return port
|
|
else:
|
|
# IP address, return only port if it's considered "loopback" or "unspecified"
|
|
if target_ip_address.is_loopback or target_ip_address.is_unspecified:
|
|
return port
|
|
|
|
return forward_str
|
|
|
|
|
|
@functools.lru_cache()
|
|
def validate_forward_target(forward_str: str) -> bool:
|
|
"""
|
|
Validate OpenSSH client forward target (either source or destination).
|
|
|
|
This function must validate each following forwarding target (-L/-R formats) :
|
|
* port
|
|
* host:port
|
|
* bind_address:port
|
|
* [bind_address_v6]:port
|
|
* socket
|
|
"""
|
|
host, port = pre_parse_forward_target(forward_str)
|
|
if port is None:
|
|
return True
|
|
|
|
try:
|
|
# allow OpenSSH special "bind addresses" as well as any valid domain name
|
|
# parse `host` as an IP address otherwise
|
|
if host not in OPENSSH_SPECIAL_BIND_ADDRESSES and HOSTNAME_REGEXP.match(host) is None:
|
|
ipaddress.ip_address(host)
|
|
|
|
int(port)
|
|
except ValueError:
|
|
return False
|
|
|
|
return True
|