1
0
mirror of https://github.com/HorlogeSkynet/SSHubl.git synced 2025-04-15 04:00:14 +02:00

Merge 64d46ff71fd95f5e6aeb9436e056bb5a089ea5bb into 7b8dd83f6ef0a0f0aacf213697c9be1a628fd521

This commit is contained in:
Samuel FORESTIER 2024-11-04 21:58:38 +00:00 committed by GitHub
commit 963767c1a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 734 additions and 87 deletions

@ -9,12 +9,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Interactive SSH connection (through Terminus)
- Disable spellcheck in remote terminal view (Terminus v0.3.32+)
### Fixed
- Plugin loading on Windows
- UNIX domain socket removal against Windows remote
- `ssh_host_authentication_for_localhost` cannot be disabled
### Removed
- `terminus_is_installed` (hidden) setting
## [0.4.0] - 2024-08-07
### Added

@ -11,6 +11,13 @@
"command": "ssh_connect",
"caption": "SSHubl: Connect to server"
},
{
"command": "ssh_connect_interactive",
"caption": "SSHubl: Connect to server (interactively)"
},
{
"command": "ssh_interactive_connection_watcher"
},
{
"command": "ssh_disconnect",
"caption": "SSHubl: Disconnect from server"

@ -16,6 +16,10 @@
"command": "ssh_connect",
"caption": "Connect to server"
},
{
"command": "ssh_connect",
"caption": "Connect to server (interactively)"
},
{
"command": "ssh_disconnect",
"caption": "Disconnect from server"

@ -22,8 +22,8 @@ It has been inspired by Visual Studio Code [Remote - SSH](https://marketplace.vi
* Sublime Text 4081+
* OpenSSH client
* sshfs (FUSE) client
* pexpect (Python package)
* [Terminus](https://packagecontrol.io/packages/Terminus) (Sublime Text package, for remote terminal feature)
* pexpect Python package (used for non-interactive SSH connection on Linux/macOS)
* [Terminus](https://packagecontrol.io/packages/Terminus) Sublime Text package (used for remote terminal feature on Linux/macOS, **required** on Windows)
On Debian : `apt-get install -y sshfs`
@ -43,8 +43,9 @@ Package Control dedicated page [here](https://packagecontrol.io/packages/SSHubl)
1. Go to the Sublime Text packages folder (usually `$HOME/.config/sublime-text/Packages/` or `%AppData%\Sublime Text\Packages\`)
2. Clone this repository there : `git clone https://github.com/HorlogeSkynet/SSHubl.git`
3. Satisfy `pexpect` and `ptyprocess` third-party dependencies in Sublime Text `Lib/python38/` folder (see [here](https://stackoverflow.com/a/61200528) for further information)
4. Restart Sublime Text and... :tada:
3. \[Linux/macOS\] Satisfy either `pexpect` and `ptyprocess` third-party dependencies in Sublime Text `Lib/python38/` folder (see [here](https://stackoverflow.com/a/61200528) for further information) or [Terminus](https://packagecontrol.io/packages/Terminus) Sublime Text package dependency
4. \[Windows\] Satisfy [Terminus](https://packagecontrol.io/packages/Terminus) Sublime Text package dependency
5. Restart Sublime Text and... :tada:
## Usage
@ -81,7 +82,7 @@ Open your command palette and type in `SSHubl` to select `Connect to server`. On
## Frequently Asked Questions
### Why can I connect to new hosts without accepting their fingerprint ?
### Why can I non-interactively connect to new hosts without accepting their fingerprint ?
> `pexpect` package is [known to always accept remotes' public key](https://github.com/pexpect/pexpect/blob/4.9/pexpect/pxssh.py#L411-L414), and it isn't configurable.

@ -1,4 +1,8 @@
{
"$schema": "sublime://packagecontrol.io/schemas/dependencies",
"windows": {
"*": []
},
"*": {
">4000": [
"pexpect",

@ -8,10 +8,12 @@ ST_REQUIRED_MINIMUM_VERSION = 4081
for suffix in (
# constants
"paths",
# vendors
"vendor.mslex",
# utilities
"project_data",
"ssh_utils",
"st_utils",
"ssh_utils",
# controllers
"actions",
# commands and listeners (at last, as they depend on other modules)
@ -30,7 +32,9 @@ else:
SshCancelForwardCommand,
SshCloseDirCommand,
SshConnectCommand,
SshConnectInteractiveCommand,
SshConnectPasswordCommand,
SshInteractiveConnectionWatcherCommand,
SshDisconnectCommand,
SshOpenDirCommand,
SshRequestForwardCommand,

@ -7,6 +7,9 @@ ignored-modules = [
"sublime",
"sublime_plugin",
]
ignore-paths = [
"sshubl/vendor",
]
jobs = 0
load-plugins = [
"pylint_secure_coding_standard",
@ -17,6 +20,9 @@ min-public-methods = 0
[tool.mypy]
check_untyped_defs = true
exclude = [
"sshubl/vendor",
]
[[tool.mypy.overrides]]
module = "sublime.*"
@ -28,3 +34,6 @@ ignore_missing_imports = true
[tool.ruff]
line-length = 100
exclude = [
"sshubl/vendor",
]

@ -19,6 +19,7 @@ from .ssh_utils import (
mount_sshfs,
ssh_check_master,
ssh_connect,
ssh_connect_interactive,
ssh_disconnect,
ssh_forward,
umount_sshfs,
@ -32,6 +33,49 @@ from .st_utils import (
_logger = logging.getLogger(__package__)
def _on_connection(
view: sublime.View,
ssh_session: SshSession,
mounts: typing.Optional[typing.Dict[str, str]] = None,
forwards: typing.Optional[typing.List[dict]] = None,
) -> None:
# store SSH session metadata in project data
# Development note : on **re-connection**, mounts and forwards are reset here and will be
# directly re-populated by thread actions below
ssh_session.set_in_project_data(view.window())
_logger.info("successfully connected to %s !", ssh_session)
update_window_status(view.window())
# re-mount and re-open previous remote folders (if any)
for mount_path, remote_path in (mounts or {}).items():
SshMountSshfs(
view,
uuid.UUID(ssh_session.identifier),
# here paths are strings due to JSON serialization, infer flavour back for remote
mount_path=Path(mount_path),
remote_path=typing.cast(PurePath, get_absolute_purepath_flavour(remote_path)),
).start()
# re-open previous forwards (if any)
for forward in forwards or []:
# infer original forwarding rule from "local" and "remote" targets
is_reverse = forward["is_reverse"]
target_1, target_2 = (
forward["target_remote"] if is_reverse else forward["target_local"],
forward["target_local"] if is_reverse else forward["target_remote"],
)
SshForward(
view,
uuid.UUID(ssh_session.identifier),
is_reverse,
target_1,
target_2,
).start()
class SshConnect(Thread):
def __init__( # pylint: disable=too-many-arguments
self,
@ -91,45 +135,91 @@ class SshConnect(Thread):
finally:
self.view.erase_status("zz_connection_in_progress")
if identifier is None:
return
# store SSH session metadata in project data
# Development note : on **re-connection**, mounts and forwards are reset here and will be
# directly re-populated by thread actions below
ssh_session = SshSession(str(identifier), host, port, login)
ssh_session.set_in_project_data(self.view.window())
_logger.info("successfully connected to %s !", ssh_session)
update_window_status(self.view.window())
# re-mount and re-open previous remote folders (if any)
for mount_path, remote_path in self.mounts.items():
SshMountSshfs(
if identifier is not None:
_on_connection(
self.view,
identifier,
# here paths are strings due to JSON serialization, infer flavour back for remote
mount_path=Path(mount_path),
remote_path=typing.cast(PurePath, get_absolute_purepath_flavour(remote_path)),
).start()
# re-open previous forwards (if any)
for forward in self.forwards:
# infer original forwarding rule from "local" and "remote" targets
is_reverse = forward["is_reverse"]
target_1, target_2 = (
forward["target_remote"] if is_reverse else forward["target_local"],
forward["target_local"] if is_reverse else forward["target_remote"],
SshSession(str(identifier), host, port, login),
self.mounts,
self.forwards,
)
SshForward(
self.view,
identifier,
is_reverse,
target_1,
target_2,
).start()
class SshInteractiveConnectionWatcher(Thread):
def __init__( # pylint: disable=too-many-arguments
self,
view: sublime.View,
identifier: uuid.UUID,
connection_str: str,
mounts: typing.Optional[typing.Dict[str, str]] = None,
forwards: typing.Optional[typing.List[dict]] = None,
):
self.view = view
self.identifier = identifier
self.connection_str = connection_str
# below attributes are only used in case of re-connection
self.mounts = mounts or {}
self.forwards = forwards or []
super().__init__()
def run(self):
_logger.debug(
"interactive connection watcher is starting up for %s (view=%d)...",
self.identifier,
self.view.id(),
)
host, port, login, _ = parse_ssh_connection(self.connection_str)
_logger.debug(
"SSH connection string is : %s@%s:%d",
login,
format_ip_addr(host),
port,
)
self.view.set_status(
"zz_connection_in_progress",
f"Connecting to ssh://{login}@{format_ip_addr(host)}:{port}...",
)
try:
while True:
# we fetch view "validity" _here_ to prevent a race condition when user closes the
# view right *after* we actually checked whether connection succeeded.
is_view_valid = self.view.is_valid()
# when master is considered "up" (i.e. client successfully connected to server), run
# connection postlude and leave
if ssh_check_master(self.identifier):
_on_connection(
self.view,
SshSession(str(self.identifier), host, port, login, is_interactive=True),
self.mounts,
self.forwards,
)
break
# stop this thread if view was closed (i.e. client has terminated)
if not is_view_valid:
# if view corresponded to a reconnection attempt, we have to update `is_up`
# session attribute as current attempt failed
with project_data_lock:
ssh_session = SshSession.get_from_project_data(self.identifier)
if ssh_session is not None:
ssh_session.is_up = False
ssh_session.set_in_project_data(self.view.window())
break
time.sleep(2)
finally:
self.view.erase_status("zz_connection_in_progress")
_logger.debug(
"interactive connection watcher is shutting down for %s (view=%d)...",
self.identifier,
self.view.id(),
)
class SshDisconnect(Thread):
@ -290,7 +380,7 @@ class SshKeepaliveThread(Thread):
If master fails to answer, a re-connection attempt occurs.
"""
__LOOP_PERIOD = 15
__LOOP_PERIOD = 10
def __init__(self, *args, window: sublime.Window, **kwargs):
self.window = window
@ -332,13 +422,22 @@ class SshKeepaliveThread(Thread):
continue
_logger.warning("%s's master is down : attempting to reconnect...", ssh_session)
SshConnect(
self.window.active_view(),
str(ssh_session),
session_identifier,
ssh_session.mounts,
ssh_session.forwards,
).start()
if ssh_session.is_interactive:
ssh_connect_interactive(
str(ssh_session),
session_identifier,
ssh_session.mounts,
ssh_session.forwards,
self.window,
)
else:
SshConnect(
self.window.active_view(),
str(ssh_session),
session_identifier,
ssh_session.mounts,
ssh_session.forwards,
).start()
# set "up" status to `None` so we know a re-connection attempt is in progress
ssh_session.is_up = None

@ -14,17 +14,21 @@ from .actions import (
SshDisconnect,
SshForward,
SshMountSshfs,
SshInteractiveConnectionWatcher,
schedule_ssh_connect_password_command,
)
from .project_data import SshSession
from .ssh_utils import (
IS_NONINTERACTIVE_SUPPORTED,
get_base_ssh_cmd,
platformlex,
ssh_exec,
ssh_connect_interactive,
)
from .st_utils import (
format_ip_addr,
get_absolute_purepath_flavour,
get_command_class,
is_terminus_installed,
parse_ssh_connection,
validate_forward_target,
)
@ -158,18 +162,26 @@ class _SshSessionInputHandler(sublime_plugin.ListInputHandler):
class _ConnectInputHandler(sublime_plugin.TextInputHandler):
def __init__(self, *args, with_password: bool = True, **kwargs):
self.with_password = with_password
super().__init__(*args, **kwargs)
def name(self):
return "connection_str"
def placeholder(self):
return "user[:password]@host[:port]"
return f"user{'[:password]' if self.with_password else ''}@host[:port]"
def validate(self, text):
try:
host, *_ = parse_ssh_connection(text)
host, _, __, password = parse_ssh_connection(text)
except ValueError:
return False
if not self.with_password and password is not None:
return False
return bool(host)
@ -177,6 +189,9 @@ class SshConnectCommand(sublime_plugin.TextCommand):
def run(self, _edit, connection_str: str):
SshConnect(self.view, connection_str).start()
def is_enabled(self):
return IS_NONINTERACTIVE_SUPPORTED
def input(self, _args):
return _ConnectInputHandler()
@ -325,6 +340,40 @@ class SshConnectPasswordCommand(sublime_plugin.WindowCommand):
ssh_connect_password_command_lock.release()
class SshConnectInteractiveCommand(sublime_plugin.TextCommand):
def run(self, _edit, connection_str: str):
ssh_connect_interactive(connection_str, window=self.view.window())
def input(self, _args):
return _ConnectInputHandler(with_password=False)
def input_description(self):
return "SSH: Connect to server"
class SshInteractiveConnectionWatcherCommand(sublime_plugin.TextCommand):
"""
(Hidden) command which only purpose is to be called by Terminus (setup via `post_view_hooks` in
`ssh_connect_interactive`) to watch interactive SSH connections and eventually store session in
project data.
"""
def run( # pylint: disable=too-many-arguments
self,
_edit,
connection_str: str,
identifier: str,
mounts: typing.Optional[typing.Dict[str, str]] = None,
forwards: typing.Optional[typing.List[dict]] = None,
):
SshInteractiveConnectionWatcher(
self.view, uuid.UUID(identifier), connection_str, mounts, forwards
).start()
def is_visible(self):
return False
class SshDisconnectCommand(sublime_plugin.TextCommand):
@_with_session_identifier
def run(self, _edit, identifier: str):
@ -708,11 +757,7 @@ class SshCloseDirCommand(sublime_plugin.TextCommand):
class SshTerminalCommand(sublime_plugin.TextCommand):
@_with_session_identifier
def run(self, _edit, identifier: str):
# check for Terminus `terminus_open` command support before actually continuing.
# we check for a (hidden) setting which allows package lookup bypass for developers who know
# what they're doing
terminus_open_command = get_command_class("TerminusOpenCommand")
if not _settings().get("terminus_is_installed") and terminus_open_command is None:
if not is_terminus_installed():
sublime.error_message("Please install Terminus package to open a remote terminal !")
return
@ -723,14 +768,14 @@ class SshTerminalCommand(sublime_plugin.TextCommand):
title = str(ssh_session) if ssh_session is not None else None
terminus_open_args: typing.Dict[str, typing.Any] = {
"shell_cmd": shlex.join(get_base_ssh_cmd(session_identifier, ("-q",))),
"shell_cmd": platformlex.join(get_base_ssh_cmd(session_identifier, ("-q",))),
"title": title,
}
# Disable spellcheck in terminal view as it's usually irrelevant and report many misspelled
# words on shells. Moreover, as we're connected to a different host it's even likely local
# and remote locales do not match. Although, as it's an opinionated take, we also check for
# a(nother) hidden setting before doing so :-)
# a hidden setting before doing so :-)
# Development note : `view_settings` argument is only supported by Terminus v0.3.32+
if not _settings().get("honor_spell_check"):
terminus_open_args["view_settings"] = {

@ -96,13 +96,14 @@ def remove_from_project_folders(
@dataclasses.dataclass
class SshSession:
class SshSession: # pylint: disable=too-many-instance-attributes
identifier: str
host: str
port: int
login: str
mounts: typing.Dict[str, str] = dataclasses.field(default_factory=dict)
forwards: typing.List[typing.Dict[str, typing.Any]] = dataclasses.field(default_factory=list)
is_interactive: bool = False
is_up: typing.Optional[bool] = True
def __str__(self) -> str:
@ -167,6 +168,7 @@ class SshSession:
# "target_remote": "127.0.0.1:4242", // allocated by remote
# },
# ],
# "is_interactive": false,
# "is_up": true,
# },
},

@ -7,13 +7,32 @@ import shutil
import subprocess
import typing
import uuid
from pathlib import Path, PurePath
from pathlib import Path, PurePath, PureWindowsPath
try:
from pexpect import pxssh
except ImportError:
IS_NONINTERACTIVE_SUPPORTED = False
else:
IS_NONINTERACTIVE_SUPPORTED = True
import sublime
from pexpect import pxssh
from .paths import mounts_path, sockets_path
from .st_utils import pre_parse_forward_target
from .st_utils import (
format_ip_addr,
get_absolute_purepath_flavour,
is_terminus_installed,
parse_ssh_connection,
pre_parse_forward_target,
)
from .vendor import mslex
if platform.system() != "Windows":
platformlex = shlex
else:
platformlex = mslex # type: ignore[misc]
_logger = logging.getLogger(__package__)
@ -22,6 +41,10 @@ def _settings():
return sublime.load_settings("SSHubl.sublime-settings")
# We double-quote OpenSSH option values to properly deal with white-spaces. From our tests, this is
# the only way to correctly deal with UNIX **and** Windows once escaping has been done.
OPENSSH_OPTION = '-o{0}="{1}"'
ssh_program = _settings().get("ssh_path") or shutil.which("ssh")
sshfs_program = _settings().get("sshfs_path") or shutil.which("sshfs")
umount_program = _settings().get("umount_path")
@ -48,10 +71,10 @@ def get_base_ssh_cmd(
base_ssh_cmd = [
program_path,
f"-oControlPath={str(sockets_path / str(identifier))}",
OPENSSH_OPTION.format("ControlPath", sockets_path / str(identifier)),
# Prevent connection to fake 'destination" if control master is unavailable (inspired by
# <https://serverfault.com/a/914779>)
"-oProxyCommand='exit 1'",
OPENSSH_OPTION.format("ProxyCommand", "exit 1"),
*args,
]
@ -62,6 +85,18 @@ def get_base_ssh_cmd(
return base_ssh_cmd
def get_ssh_master_options(identifier: uuid.UUID) -> dict:
return {
**_settings().get("ssh_options", {}),
# enforce keep-alive for future sshfs usages (see upstream recommendations)
"ServerAliveInterval": str(_settings().get("ssh_server_alive_interval", 15)),
"ControlMaster": "auto",
"ControlPath": str(sockets_path / str(identifier)),
# keep connection opened for 1 minute (without new connection to control socket)
"ControlPersist": "60",
}
def ssh_connect(
host: str,
port: int,
@ -81,21 +116,14 @@ def ssh_connect(
"""
if ssh_program is None:
raise RuntimeError(f"{ssh_program} has not been found !")
if not IS_NONINTERACTIVE_SUPPORTED:
raise RuntimeError("Non-interactive connection isn't supported !")
identifier = identifier or uuid.uuid4()
if identifier is None:
identifier = uuid.uuid4()
# run OpenSSH using pexpect to setup connection and non-interactively deal with prompts
ssh = pxssh.pxssh(
options={
**_settings().get("ssh_options", {}),
# enforce keep-alive for future sshfs usages (see upstream recommendations)
"ServerAliveInterval": str(_settings().get("ssh_server_alive_interval", 15)),
"ControlMaster": "auto",
"ControlPath": str(sockets_path / str(identifier)),
# keep connection opened for 1 minute (without new connection to control socket)
"ControlPersist": "60",
}
)
ssh = pxssh.pxssh(options=get_ssh_master_options(identifier))
# if a password has been given, force password authentication
if password is not None:
@ -125,6 +153,68 @@ def ssh_connect(
return identifier
def ssh_connect_interactive(
connection_str: str,
identifier: typing.Optional[uuid.UUID] = None,
mounts: typing.Optional[typing.Dict[str, str]] = None,
forwards: typing.Optional[typing.List[dict]] = None,
window: typing.Optional[sublime.Window] = None,
) -> None:
if ssh_program is None:
raise RuntimeError(f"{ssh_program} has not been found !")
if not is_terminus_installed():
sublime.error_message("Please install Terminus package to connect interactively !")
return
if window is None:
window = sublime.active_window()
if identifier is None:
identifier = uuid.uuid4()
ssh_options = get_ssh_master_options(identifier)
if not _settings().get("ssh_host_authentication_for_localhost", True):
ssh_options["NoHostAuthenticationForLocalhost"] = "yes"
host, port, login, _ = parse_ssh_connection(connection_str)
terminus_open_args: typing.Dict[str, typing.Any] = {
"shell_cmd": platformlex.join(
(
ssh_program,
f"-l{login}",
f"-p{port}",
*[OPENSSH_OPTION.format(key, value) for key, value in ssh_options.items()],
host,
)
),
"title": f"{login}@{format_ip_addr(host)}:{port}",
"auto_close": "on_success",
"post_view_hooks": [
# makes Terminus executes a command which will wait for SSH connection to actually
# succeed before storing session in project data
(
"ssh_interactive_connection_watcher",
{
"identifier": str(identifier),
"connection_str": connection_str,
"mounts": mounts,
"forwards": forwards,
},
),
],
}
# Development note : please see `SshTerminalCommand` own documentation for below block rationale
if not _settings().get("honor_spell_check"):
terminus_open_args["view_settings"] = {
"spell_check": False,
}
window.run_command("terminus_open", terminus_open_args)
def ssh_disconnect(identifier: uuid.UUID) -> None:
"""
Kill a SSH connection master, causing session graceful disconnection.
@ -233,6 +323,23 @@ def umount_sshfs(mount_path: Path) -> None:
mount_path.rmdir()
def _remove_unix_domain_socket(
identifier: uuid.UUID, socket_path: str, *, is_reverse: bool
) -> None:
if not is_reverse:
Path(socket_path).unlink(missing_ok=True)
return
remove_socket_cmd: typing.Tuple[str, ...]
if isinstance(get_absolute_purepath_flavour(socket_path), PureWindowsPath):
remove_socket_cmd = ("del", "/q", mslex.quote(socket_path))
else:
remove_socket_cmd = ("rm", "-f", "--", shlex.quote(socket_path))
if ssh_exec(identifier, remove_socket_cmd) is None:
_logger.warning("couldn't remove remote UNIX domain socket : %s", socket_path)
def ssh_forward(
identifier: uuid.UUID, do_open: bool, is_reverse: bool, target_1: str, target_2: str
) -> typing.Optional[dict]:
@ -294,15 +401,7 @@ def ssh_forward(
# when closing an UNIX domain socket forward, also remove socket from disk to allow future
# forward requests to re-use the same path
if not do_open and target_1_port is None:
if is_reverse:
unix_socket_path = shlex.quote(target_1)
if (
ssh_exec(identifier, ("rm", "-f", unix_socket_path)) is None
and ssh_exec(identifier, ("del", "/q", unix_socket_path)) is None
):
_logger.warning("couldn't remove remote UNIX domain socket : %s", unix_socket_path)
else:
Path(target_1).unlink(missing_ok=True)
_remove_unix_domain_socket(identifier, target_1, is_reverse=is_reverse)
_logger.debug(
"successfully %s forward %s %s %s",

@ -58,6 +58,10 @@ def get_command_class(class_name: str) -> typing.Optional[typing.Type[sublime_pl
return None
def is_terminus_installed() -> bool:
return get_command_class("TerminusOpenCommand") is not None
@functools.lru_cache()
def format_ip_addr(host: str) -> str:
"""

0
sshubl/vendor/__init__.py vendored Normal file

15
sshubl/vendor/mslex/LICENSE vendored Normal file

@ -0,0 +1,15 @@
Apache Software License 2.0
Copyright (c) 2019, Lawrence D'Anna
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

345
sshubl/vendor/mslex/__init__.py vendored Normal file

@ -0,0 +1,345 @@
# -*- coding: utf-8 -*-
"""
On windows, before a command line argument becomes a ``char*`` in a
program's argv, it must be parsed by both ``cmd.exe``, and by
``CommandLineToArgvW``.
For some strings there is no way to quote them so they will
parse correctly in both situations.
"""
import sys
import re
import itertools
from typing import Iterator, List, Match, TextIO, Optional # noqa: F401
from .exceptions import MSLexError
__all__ = (
"split",
"split_ucrt",
"split_msvcrt",
"strip_carets_like_cmd",
"quote",
"join",
"MSLexError",
)
__version__ = "1.3.0"
def _iter_arg_msvcrt(peek: Match[str], i: Iterator[Match[str]]) -> Iterator[str]:
quote_mode = False
for m in itertools.chain([peek], i):
space, slashes, quotes, text = m.groups()
if space:
if quote_mode:
yield space
else:
return
elif quotes:
n_slashes = len(slashes)
n_quotes = len(quotes)
slashes_odd = bool(n_slashes % 2)
yield "\\" * (n_slashes // 2)
magic_sum = n_quotes + quote_mode + 2 * slashes_odd
yield '"' * (magic_sum // 3)
quote_mode = (magic_sum % 3) == 1
else:
yield text
def split_msvcrt(s: str) -> List[str]:
"""
Split a string of command line options like `msvcrt.dll`_ does.
:param s: a string to parse
:return: a list of parsed words
This parses arguments the same way `CommandLineToArgvW`_ does, except
it does not treat ``argv[0]`` specially.
Specifically, it is the same as ``CommandLineToArgvW("foo.exe " + s)[1:]``
If the first word of ``s`` is a valid command name, then it cannot contain
any quotes, so this is the same as ``CommandLineToArgvW(s)``
.. _`CommandLineToArgvW`: https://learn.microsoft.com/en-us/windows/win32/api/shellapi\
/nf-shellapi-commandlinetoargvw
.. _`msvcrt.dll`: https://devblogs.microsoft.com/oldnewthing/20140411-00/?p=1273
"""
i = re.finditer(r"(\s+)|(\\*)(\"+)|(.[^\s\\\"]*)", s.lstrip())
return ["".join(_iter_arg_msvcrt(m, i)) for m in i]
def _iter_arg_ucrt(peek: Match[str], i: Iterator[Match[str]]) -> Iterator[str]:
quote_mode = False
for m in itertools.chain([peek], i):
space, slashes, quotes, text = m.groups()
if space:
if quote_mode:
yield space
else:
return
elif quotes:
if slashes:
yield slashes[: len(slashes) // 2]
if len(slashes) % 2:
yield '"'
quotes = quotes[1:]
while quotes:
if quote_mode and len(quotes) >= 2:
yield '"'
quotes = quotes[2:]
else:
quote_mode = not quote_mode
quotes = quotes[1:]
else:
yield text
def split_ucrt(s: str) -> List[str]:
"""
Split a string of command line options like `UCRT`_ does.
:param s: a string to parse
:return: a list of parsed words
This should compute the same function that is used by a modern windows
C runtime library to convert arguments in ``GetCommandLineW`` to
individual arguments found in ``argv``, except it does not treat
``argv[0]`` specially.
see: `Parsing C Command Line Arguments`_
.. _`UCRT`: https://learn.microsoft.com/en-us/cpp/porting/\
upgrade-your-code-to-the-universal-crt
.. _`Parsing C Command Line Arguments`: https://learn.microsoft.com/en-us/cpp/c-language\
/parsing-c-command-line-arguments
"""
i = re.finditer(r"(\s+)|(\\*)(\"+)|(.[^\s\\\"]*)", s.lstrip())
return ["".join(_iter_arg_ucrt(m, i)) for m in i]
cmd_meta = r"([\"\^\&\|\<\>\(\)\%\!])"
cmd_meta_or_space = r"[\s\"\^\&\|\<\>\(\)\%\!]"
cmd_meta_inside_quotes = r"([\"\%\!])"
def strip_carets_like_cmd(s: str, check: bool = True) -> str:
"""
Interpret caret escaping like ``cmd.exe`` does.
:param s: a command line string
:param check: raise an error on unquoted metacharacters
:returns: the string with any carets interpreted as an escape character
"""
def i() -> Iterator[str]:
quote_mode = False
for m in re.finditer(r"(\^.?)|(\")|([^\^\"]+)", s):
escaped, quote, text = m.groups()
if escaped:
if quote_mode:
yield escaped
if len(escaped) > 1:
if escaped[1] == '"':
quote_mode = False
elif check and escaped[1] in "!%":
raise MSLexError("Unquoted CMD metacharacters in string: " + repr(s))
else:
yield escaped[1:]
elif quote:
yield '"'
quote_mode = not quote_mode
else:
yield text
if check:
meta = cmd_meta_inside_quotes if quote_mode else cmd_meta
if re.search(meta, text):
raise MSLexError("Unquoted CMD metacharacters in string: " + repr(s))
return "".join(i())
def split(
s: str, like_cmd: bool = True, check: bool = True, ucrt: Optional[bool] = None
) -> List[str]:
"""
Split a string of command line arguments like DOS and Windows do.
:param s: a string to parse
:param like_cmd: parse it like ``cmd.exe``
:param ucrt: parse like UCRT
:param check: raise an error on unquoted metacharacters
:return: a list of parsed words
If ``like_cmd`` is true, then this will emulate both ``cmd.exe`` and
``CommandLineToArgvW``. Since ``cmd.exe`` is a shell, and can run
external programs, this function obviously cannot emulate everything it
does. However if the string passed in would be parsed by cmd as a
quoted literal, without command invocations like ``&whoami``, and
without string substitutions like ``%PATH%``, then this function will
split it accurately.
f ``like_cmd`` is false, then this will split the string like
``CommandLineToArgvW`` does.
If ``check`` is true, this will raise a ``ValueError`` if cmd
metacharacters occur in the string without being quoted.
If ``ucrt`` is true, this will parse like a modern C runtime. If it
is false, then it will parse like ``msvcrt.dll``. If it is None, then
it will raise an exception if the two methods disagree.
.. note:: This does not treat ``argv[0]`` specially as described in Microsoft's
`documentation`_, because this function does not have any way of knowing
if the first word of ``s`` is meant to be used as the program name. If
it is, then it should be a valid path name, so it can not contain
quotes, so both methods of interpretation will give the same answer.
.. _`documentation`: https://learn.microsoft.com/en-us/cpp/c-language/\
parsing-c-command-line-arguments
"""
if like_cmd and re.search(cmd_meta, s):
s = strip_carets_like_cmd(s, check=check)
if ucrt is None:
v = split_ucrt(s)
if v != split_msvcrt(s):
raise MSLexError(
"String is ambiguous, legacy and modern runtimes disagree: " + repr(s)
)
return v
elif ucrt:
return split_ucrt(s)
else:
return split_msvcrt(s)
def _escape_quotes(s: str) -> str:
"""
Escape any quotes found in string by prefixing them with an appropriate
number of backslashes.
"""
i = re.finditer(r"(\\*)(\"+)|(\\+|[^\\\"]+)", s)
def parts() -> Iterator[str]:
for m in i:
pos, end = m.span()
slashes, quotes, text = m.groups()
if quotes:
yield slashes
yield slashes
yield r"\"" * len(quotes)
else:
yield text
return "".join(parts())
def _wrap_in_quotes(s: str) -> str:
"""
Wrap a string whose internal quotes have been escaped in double quotes.
This handles adding the correct number of backslashes in front of the
closing quote.
"""
return '"' + re.sub(r"(\\+)$", r"\1\1", s) + '"'
def _quote_for_cmd(s: str) -> str:
"""
Quote a string for cmd. Split the string into sections that can be
quoted (or used verbatim), and runs of % and ! characters which must be
escaped with carets outside of quotes, and runs of quote characters,
which must be escaped with a caret for cmd.exe, and a backslash for
CommandLineToArgvW.
"""
def f(m) -> str:
quotable, subst = m.groups()
if quotable:
# A trailing backslash could combine a backslash escaping a
# quote, so it must be quoted
if re.search(cmd_meta_or_space, quotable) or quotable.endswith("\\"):
return _wrap_in_quotes(quotable)
else:
return quotable
elif subst:
return "^" + subst
else:
return '\\^"'
return re.sub(r'([^\%\!\"]+)|([\%\!])|"', f, s)
def quote(s: str, for_cmd: bool = True) -> str:
"""
Quote a string for use as a command line argument in DOS or Windows.
:param s: a string to quote
:param for_cmd: quote it for ``cmd.exe``
:return: quoted string
If ``for_cmd`` is true, then this will quote the strings so the result will
be parsed correctly by ``cmd.exe`` and then by ``CommandLineToArgvW``. If
false, then this will quote the strings so the result will
be parsed correctly when passed directly to ``CommandLineToArgvW``.
"""
if not s:
return '""'
if for_cmd:
if not re.search(cmd_meta_or_space, s):
return s
quoted = _quote_for_cmd(s)
if not re.search(r"[\s\"]", s):
# for example the string «x\!» can be quoted as «x\^!», but
# _quote_for_cmd would quote it as «"x\\"^!»
alt = re.sub(cmd_meta, r"^\1", s)
if len(alt) < len(quoted):
return alt
return quoted
else:
if not re.search(r"\s", s):
return _escape_quotes(s)
return _wrap_in_quotes(_escape_quotes(s))
def join(split_command: List[str], for_cmd: bool = True) -> str:
"""
Quote and concatenate a list of strings for use as a command line in DOS
or Windows.
:param split_command: a list of words to be quoted
:param for_cmd: quote it for ``cmd.exe``
:return: quoted command string
If ``for_cmd`` is true, then this will quote the strings so the result will
be parsed correctly by ``cmd.exe`` and then by ``CommandLineToArgvW``. If
false, then this will quote the strings so the result will
be parsed correctly when passed directly to ``CommandLineToArgvW``.
"""
return " ".join(quote(arg, for_cmd) for arg in split_command)
def split_cli() -> None:
import argparse
parser = argparse.ArgumentParser(
description="split a file into strings using windows-style quoting "
)
parser.add_argument("filename", nargs="?", help="file to split")
args = parser.parse_args()
if args.filename:
input = open(args.filename, "r") # type: TextIO
else:
input = sys.stdin
for s in split(input.read(), like_cmd=False):
print(s)

2
sshubl/vendor/mslex/exceptions.py vendored Normal file

@ -0,0 +1,2 @@
class MSLexError(ValueError):
"""Class for mslex errors"""

0
sshubl/vendor/mslex/py.typed vendored Normal file