Add option for listing auth servers
This commit is contained in:
parent
f52f8988f4
commit
37eeb73c47
2 changed files with 35 additions and 12 deletions
|
@ -35,6 +35,17 @@ def command(help: str) -> Callable[[Callable], Callable]:
|
||||||
continue
|
continue
|
||||||
if value is not None and (questions[key]["type"] != "confirm" or value != "null"):
|
if value is not None and (questions[key]["type"] != "confirm" or value != "null"):
|
||||||
questions.pop(key, None)
|
questions.pop(key, None)
|
||||||
|
try:
|
||||||
|
required_unless = questions[key].pop("required_unless")
|
||||||
|
if isinstance(required_unless, str) and kwargs[required_unless]:
|
||||||
|
questions.pop(key)
|
||||||
|
elif isinstance(required_unless, dict):
|
||||||
|
for k, v in required_unless.items():
|
||||||
|
if kwargs.get(v, object()) == v:
|
||||||
|
questions.pop(key)
|
||||||
|
break
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
question_list = list(questions.values())
|
question_list = list(questions.values())
|
||||||
question_list.reverse()
|
question_list.reverse()
|
||||||
resp = prompt(question_list, keyboard_interrupt_msg="Aborted!")
|
resp = prompt(question_list, keyboard_interrupt_msg="Aborted!")
|
||||||
|
@ -65,7 +76,8 @@ yesno.__name__ = "yes/no"
|
||||||
def option(short: str, long: str, message: str = None, help: str = None,
|
def option(short: str, long: str, message: str = None, help: str = None,
|
||||||
click_type: Union[str, Callable[[str], Any]] = None, inq_type: str = None,
|
click_type: Union[str, Callable[[str], Any]] = None, inq_type: str = None,
|
||||||
validator: Validator = None, required: bool = False, default: str = None,
|
validator: Validator = None, required: bool = False, default: str = None,
|
||||||
is_flag: bool = False, prompt: bool = True) -> Callable[[Callable], Callable]:
|
is_flag: bool = False, prompt: bool = True, required_unless: str = None
|
||||||
|
) -> Callable[[Callable], Callable]:
|
||||||
if not message:
|
if not message:
|
||||||
message = long[2].upper() + long[3:]
|
message = long[2].upper() + long[3:]
|
||||||
click_type = validator.click_type if isinstance(validator, ClickValidator) else click_type
|
click_type = validator.click_type if isinstance(validator, ClickValidator) else click_type
|
||||||
|
@ -85,9 +97,11 @@ def option(short: str, long: str, message: str = None, help: str = None,
|
||||||
"name": long[2:],
|
"name": long[2:],
|
||||||
"message": message,
|
"message": message,
|
||||||
}
|
}
|
||||||
|
if required_unless is not None:
|
||||||
|
q["required_unless"] = required_unless
|
||||||
if default is not None:
|
if default is not None:
|
||||||
q["default"] = default
|
q["default"] = default
|
||||||
if required:
|
if required or required_unless is not None:
|
||||||
q["validator"] = Required(validator)
|
q["validator"] = Required(validator)
|
||||||
elif validator:
|
elif validator:
|
||||||
q["validator"] = validator
|
q["validator"] = validator
|
||||||
|
|
|
@ -29,7 +29,6 @@ history_count: int = 10
|
||||||
|
|
||||||
enc = functools.partial(quote, safe="")
|
enc = functools.partial(quote, safe="")
|
||||||
|
|
||||||
|
|
||||||
friendly_errors = {
|
friendly_errors = {
|
||||||
"server_not_found": "Registration target server not found.\n\n"
|
"server_not_found": "Registration target server not found.\n\n"
|
||||||
"To log in or register through maubot, you must add the server to the\n"
|
"To log in or register through maubot, you must add the server to the\n"
|
||||||
|
@ -39,24 +38,33 @@ friendly_errors = {
|
||||||
|
|
||||||
|
|
||||||
@cliq.command(help="Log into a Matrix account via the Maubot server")
|
@cliq.command(help="Log into a Matrix account via the Maubot server")
|
||||||
@cliq.option("-h", "--homeserver", help="The homeserver to log into", required=True)
|
@cliq.option("-h", "--homeserver", help="The homeserver to log into", required_unless="list")
|
||||||
@cliq.option("-u", "--username", help="The username to log in with", required=True)
|
@cliq.option("-u", "--username", help="The username to log in with", required_unless="list")
|
||||||
@cliq.option("-p", "--password", help="The password to log in with", inq_type="password",
|
@cliq.option("-p", "--password", help="The password to log in with", inq_type="password",
|
||||||
required=True)
|
required_unless="list")
|
||||||
@cliq.option("-s", "--server", help="The maubot instance to log in through", default="",
|
@cliq.option("-s", "--server", help="The maubot instance to log in through", default="",
|
||||||
required=False, prompt=False)
|
required=False, prompt=False)
|
||||||
@click.option("-r", "--register", help="Register instead of logging in", is_flag=True,
|
@click.option("-r", "--register", help="Register instead of logging in", is_flag=True,
|
||||||
default=False)
|
default=False)
|
||||||
def auth(homeserver: str, username: str, password: str, server: str, register: bool) -> None:
|
@click.option("-l", "--list", help="List available homeservers", is_flag=True, default=False)
|
||||||
|
def auth(homeserver: str, username: str, password: str, server: str, register: bool, list: bool
|
||||||
|
) -> None:
|
||||||
server, token = get_token(server)
|
server, token = get_token(server)
|
||||||
if not token:
|
if not token:
|
||||||
return
|
return
|
||||||
|
headers = {"Authorization": f"Bearer {token}"}
|
||||||
|
if list:
|
||||||
|
url = f"{server}/_matrix/maubot/v1/client/auth/servers"
|
||||||
|
with urlopen(Request(url, headers=headers)) as resp_data:
|
||||||
|
resp = json.load(resp_data)
|
||||||
|
print(f"{Fore.GREEN}Available Matrix servers for registration and login:{Fore.RESET}")
|
||||||
|
for server in resp.keys():
|
||||||
|
print(f"* {Fore.CYAN}{server}{Fore.RESET}")
|
||||||
|
return
|
||||||
endpoint = "register" if register else "login"
|
endpoint = "register" if register else "login"
|
||||||
req = Request(f"{server}/_matrix/maubot/v1/client/auth/{enc(homeserver)}/{enc(endpoint)}",
|
headers["Content-Type"] = "application/json"
|
||||||
headers={
|
url = f"{server}/_matrix/maubot/v1/client/auth/{enc(homeserver)}/{endpoint}"
|
||||||
"Authorization": f"Bearer {token}",
|
req = Request(url, headers=headers,
|
||||||
"Content-Type": "application/json",
|
|
||||||
},
|
|
||||||
data=json.dumps({
|
data=json.dumps({
|
||||||
"username": username,
|
"username": username,
|
||||||
"password": password,
|
"password": password,
|
||||||
|
@ -68,6 +76,7 @@ def auth(homeserver: str, username: str, password: str, server: str, register: b
|
||||||
print(f"{Fore.GREEN}Successfully {action} "
|
print(f"{Fore.GREEN}Successfully {action} "
|
||||||
f"{Fore.CYAN}{resp['user_id']}{Fore.GREEN}.")
|
f"{Fore.CYAN}{resp['user_id']}{Fore.GREEN}.")
|
||||||
print(f"{Fore.GREEN}Access token: {Fore.CYAN}{resp['access_token']}{Fore.RESET}")
|
print(f"{Fore.GREEN}Access token: {Fore.CYAN}{resp['access_token']}{Fore.RESET}")
|
||||||
|
print(f"{Fore.GREEN}Device ID: {Fore.CYAN}{resp['device_id']}{Fore.RESET}")
|
||||||
except HTTPError as e:
|
except HTTPError as e:
|
||||||
try:
|
try:
|
||||||
err_data = json.load(e)
|
err_data = json.load(e)
|
||||||
|
|
Loading…
Reference in a new issue