From cc75c930fc35a72020453c0e83ca98b5a8bb15b4 Mon Sep 17 00:00:00 2001 From: Nicolas Werner Date: Fri, 10 May 2024 13:36:56 +0200 Subject: [PATCH] Update flat-manager-client --- scripts/flat-manager-client | 172 ++++++++++++++++++++++++++++-------- 1 file changed, 133 insertions(+), 39 deletions(-) diff --git a/scripts/flat-manager-client b/scripts/flat-manager-client index ec9cd2e4..89a23a35 100755 --- a/scripts/flat-manager-client +++ b/scripts/flat-manager-client @@ -26,7 +26,6 @@ import os import sys import time import traceback -from yarl import URL from argparse import ArgumentParser from functools import reduce from urllib.parse import urljoin, urlparse, urlsplit, urlunparse, urlunsplit @@ -72,8 +71,26 @@ class ApiError(Exception): def __str__(self): return "Api call to %s failed with status %d, details: %s" % (self.url, self.status, self.body) +class ServerApiError(ApiError): + def __init__(self, response, body): + super().__init__(response, body) -TENACITY_RETRY_EXCEPTIONS = (retry_if_exception_type(aiohttp.client_exceptions.ServerDisconnectedError) | retry_if_exception_type(ApiError) | retry_if_exception_type(aiohttp.client_exceptions.ServerConnectionError)) + +class FailedJobError(Exception): + def __init__(self, job): + self.job = job + + def repr(self): + return { + "type": "job", + "job": self.job + } + + def __str__(self): + return "Job failed: %s" % (self.job) + + +TENACITY_RETRY_EXCEPTIONS = (retry_if_exception_type(aiohttp.client_exceptions.ServerDisconnectedError) | retry_if_exception_type(ServerApiError) | retry_if_exception_type(aiohttp.client_exceptions.ServerConnectionError) | retry_if_exception_type(aiohttp.client_exceptions.ClientOSError)) TENACITY_STOP_AFTER = stop_after_delay(300) TENACITY_WAIT_BETWEEN = wait_random_exponential(multiplier=1, max=60) @@ -215,7 +232,7 @@ async def missing_objects(session, build_url, token, wanted): @retry( stop=TENACITY_STOP_AFTER, wait=TENACITY_WAIT_BETWEEN, - retry=(retry_if_exception_type(ApiError) | retry_if_exception_type(aiohttp.client_exceptions.ServerDisconnectedError)), + retry=TENACITY_RETRY_EXCEPTIONS, reraise=True, ) async def upload_files(session, build_url, token, files): @@ -228,7 +245,9 @@ async def upload_files(session, build_url, token, files): writer.headers['Authorization'] = 'Bearer ' + token resp = await session.request("post", build_url + '/upload', data=writer, headers=writer.headers) async with resp: - if resp.status != 200: + if resp.status >= 500: + raise ServerApiError(resp, await resp.text()) + elif resp.status != 200: raise ApiError(resp, await resp.text()) @@ -285,11 +304,17 @@ async def upload_objects(session, repo_path, build_url, token, objects): retry=TENACITY_RETRY_EXCEPTIONS, reraise=True, ) -async def create_ref(session, build_url, token, ref, commit): +async def create_ref(session, build_url, token, ref, commit, build_log_url=None): print("Creating ref %s with commit %s" % (ref, commit)) - resp = await session.post(build_url + "/build_ref", headers={'Authorization': 'Bearer ' + token}, json= { "ref": ref, "commit": commit} ) + resp = await session.post( + build_url + "/build_ref", + headers={ 'Authorization': 'Bearer ' + token }, + json= { "ref": ref, "commit": commit, "build-log-url": build_log_url } + ) async with resp: - if resp.status != 200: + if resp.status >= 500: + raise ServerApiError(resp, await resp.text()) + elif resp.status != 200: raise ApiError(resp, await resp.text()) data = await resp.json() @@ -306,7 +331,9 @@ async def add_extra_ids(session, build_url, token, extra_ids): print("Adding extra ids %s" % (extra_ids)) resp = await session.post(build_url + "/add_extra_ids", headers={'Authorization': 'Bearer ' + token}, json= { "ids": extra_ids} ) async with resp: - if resp.status != 200: + if resp.status >= 500: + raise ServerApiError(resp, await resp.text()) + elif resp.status != 200: raise ApiError(resp, await resp.text()) data = await resp.json() @@ -322,7 +349,10 @@ async def add_extra_ids(session, build_url, token, extra_ids): async def get_build(session, build_url, token): resp = await session.get(build_url, headers={'Authorization': 'Bearer ' + token}) if resp.status != 200: - raise ApiError(resp, await resp.text()) + if resp.status >= 500: + raise ServerApiError(resp, await resp.text()) + elif resp.status != 200: + raise ApiError(resp, await resp.text()) data = await resp.json() return data @@ -341,7 +371,9 @@ def reparse_job_results(job): async def get_job(session, job_url, token): resp = await session.get(job_url, headers={'Authorization': 'Bearer ' + token}, json={}) async with resp: - if resp.status != 200: + if resp.status >= 500: + raise ServerApiError(resp, await resp.text()) + elif resp.status != 200: raise ApiError(resp, await resp.text()) data = await resp.json() return data @@ -376,7 +408,7 @@ async def wait_for_job(session, job_url, token): if start_after and start_after > now: print("Waiting %d seconds before starting job" % (int(start_after - now))) if job_status > 0 and old_job_status == 0: - print("/ Job was started") + print("/ Job was started"); old_job_status = job_status log = job['log'] if len(log) > 0: @@ -388,9 +420,10 @@ async def wait_for_job(session, job_url, token): iterations_since_change=iterations_since_change+1 if job_status > 1: if job_status == 2: - print("\ Job completed successfully") + print("\\ Job completed successfully") else: - print("\ Job failed") + print("\\ Job failed") + raise FailedJobError(job) return job else: iterations_since_change=4 # Start at 4 so we ramp up the delay faster @@ -421,6 +454,51 @@ async def wait_for_job(session, job_url, token): sleep_time=60 time.sleep(sleep_time) +@retry( + stop=TENACITY_STOP_AFTER, + wait=TENACITY_WAIT_BETWEEN, + retry=TENACITY_RETRY_EXCEPTIONS, + reraise=True, +) +async def wait_for_checks(session, build_url, token): + print("Waiting for checks, if any...") + while True: + resp = await session.get(build_url + "/extended", headers={'Authorization': 'Bearer ' + token}) + async with resp: + if resp.status == 404: + return + if resp.status >= 500: + raise ServerApiError(resp, await resp.text()) + elif resp.status != 200: + raise ApiError(resp, await resp.text()) + + build = await resp.json() + + # wait for the repo to be validated + if build["build"]["repo_state"] == 1: + time.sleep(2) + else: + break + + for check in build["checks"]: + print("Waiting for check: %s" % (check["check_name"])) + job_url = build_url + "/check/" + check["check_name"] + "/job" + await wait_for_job(session, job_url, token) + + resp = await session.get(build_url + "/extended", headers={'Authorization': 'Bearer ' + token}) + async with resp: + if resp.status == 404: + return + if resp.status >= 500: + raise ServerApiError(resp, await resp.text()) + elif resp.status != 200: + raise ApiError(resp, await resp.text()) + + build = await resp.json() + for check in build["checks"]: + if check["status"] == 3: + print("\\ Check {} has failed".format(check["check_name"])) + raise FailedJobError(check) @retry( stop=TENACITY_STOP_AFTER, @@ -438,18 +516,23 @@ async def commit_build(session, build_url, eol, eol_rebase, token_type, wait, to json['token_type'] = token_type resp = await session.post(build_url + "/commit", headers={'Authorization': 'Bearer ' + token}, json=json) async with resp: - if resp.status != 200: + if resp.status >= 500: + raise ServerApiError(resp, await resp.text()) + elif resp.status != 200: raise ApiError(resp, await resp.text()) job = await resp.json() - job_url = keep_host(resp.headers['location'], build_url) + job_url = resp.headers['location']; if wait: - print("Waiting for commit job") - job = await wait_for_job(session, job_url, token) + pass + + print("Waiting for commit job") + await wait_for_checks(session, build_url, token) + job = await wait_for_job(session, job_url, token) reparse_job_results(job) - job["location"] = keep_host(job_url, build_url) + job["location"] = job_url return job @@ -469,9 +552,17 @@ async def publish_build(session, build_url, wait, token): if isinstance(body, str): body = json.loads(body) - if body.get("current-state") == "published": + current_state = body.get("current-state", None) + + if current_state == "published": print("the build has been already published") return {} + elif current_state == "failed": + print("the build has failed") + raise ApiError(resp, await resp.text()) + elif current_state == "validating": + print("the build is still being validated or held for review") + return {} except: pass @@ -479,14 +570,14 @@ async def publish_build(session, build_url, wait, token): raise ApiError(resp, await resp.text()) job = await resp.json() - job_url = keep_host(resp.headers['location'], build_url) + job_url = resp.headers['location']; if wait: print("Waiting for publish job") - job = await wait_for_job(session, job_url, token) + job = await wait_for_job(session, job_url, token); reparse_job_results(job) - job["location"] = keep_host(job_url, build_url) + job["location"] = job_url return job @@ -500,7 +591,9 @@ async def purge_build(session, build_url, token): print("Purging build %s" % (build_url)) resp = await session.post(build_url + "/purge", headers={'Authorization': 'Bearer ' + token}, json= {} ) async with resp: - if resp.status != 200: + if resp.status >= 500: + raise ServerApiError(resp, await resp.text()) + elif resp.status != 200: raise ApiError(resp, await resp.text()) return await resp.json() @@ -520,7 +613,9 @@ async def create_token(session, manager_url, token, name, subject, scope, durati "duration": duration, }) async with resp: - if resp.status != 200: + if resp.status >= 500: + raise ServerApiError(resp, await resp.text()) + elif resp.status != 200: raise ApiError(resp, await resp.text()) return await resp.json() @@ -543,12 +638,16 @@ async def create_command(session, args): json["app-id"] = args.app_id if args.public_download is not None: json["public-download"] = args.public_download + if args.build_log_url is not None: + json["build-log-url"] = args.build_log_url resp = await session.post(build_url, headers={'Authorization': 'Bearer ' + args.token}, json=json) async with resp: - if resp.status != 200: + if resp.status >= 500: + raise ServerApiError(resp, await resp.text()) + elif resp.status != 200: raise ApiError(resp, await resp.text()) data = await resp.json() - data["location"] = keep_host(resp.headers['location'], build_url) + data["location"] = resp.headers['location'] if not args.print_output: print(resp.headers['location']) return data @@ -566,12 +665,6 @@ def should_skip_delta(id, globs): return True return False -# work around for url_for returning http urls when flat-manager is behind a reverse proxy and aiohttp not keeping the Authorization header across redirects in version < 4 -def keep_host(location, original): - loc_url = URL(location) - org_url = URL(original) - return str(loc_url.with_scheme(org_url.scheme).with_host(org_url.host).with_port(org_url.port)) - def build_url_to_api(build_url): parts = urlparse(build_url) path = os.path.dirname(os.path.dirname(parts.path)) @@ -637,7 +730,7 @@ async def push_command(session, args): # Then the refs for ref, commit in refs.items(): - await create_ref(session, args.build_url, token, ref, commit) + await create_ref(session, args.build_url, token, ref, commit, build_log_url=args.build_log_url) # Then any extra ids if args.extra_id: @@ -659,11 +752,11 @@ async def push_command(session, args): update_job_url = build_url_to_api(args.build_url) + "/job/" + str(update_job_id) if args.wait_update: print("Waiting for repo update job") - update_job = await wait_for_job (session, update_job_url, token) + update_job = await wait_for_job (session, update_job_url, token); else: update_job = await get_job(session, update_job_url, token) reparse_job_results(update_job) - update_job["location"] = keep_host(update_job_url, update_job_url) + update_job["location"] = update_job_url data = await get_build(session, args.build_url, args.token) if commit_job: @@ -686,11 +779,11 @@ async def publish_command(session, args): update_job_url = build_url_to_api(args.build_url) + "/job/" + str(update_job_id) if args.wait_update: print("Waiting for repo update job") - update_job = await wait_for_job(session, update_job_url, args.token) + update_job = await wait_for_job(session, update_job_url, args.token); else: update_job = await get_job(session, update_job_url, args.token) reparse_job_results(update_job) - update_job["location"] = keep_host(update_job_url, args.build_url) + update_job["location"] = update_job_url return job async def purge_command(session, args): @@ -736,6 +829,7 @@ if __name__ == '__main__': create_parser.add_argument('app_id', nargs='?', help='app ID') create_parser.add_argument('--public_download', action='store_true', default=None, help='allow public read access to the build repo') create_parser.add_argument('--no_public_download', action='store_false', dest='public_download', default=None, help='allow public read access to the build repo') + create_parser.add_argument('--build-log-url', help='Set URL of the build log for the whole build') create_parser.set_defaults(func=create_command) push_parser = subparsers.add_parser('push', help='Push to repo manager') @@ -757,6 +851,7 @@ if __name__ == '__main__': push_parser.add_argument('--end-of-life', help='Set end of life') push_parser.add_argument('--end-of-life-rebase', help='Set new ID which will supercede the current one') push_parser.add_argument('--token-type', help='Set token type', type=int) + push_parser.add_argument('--build-log-url', help='Set URL of the build log for each uploaded ref') push_parser.set_defaults(func=push_command) commit_parser = subparsers.add_parser('commit', help='Commit build') @@ -834,7 +929,7 @@ if __name__ == '__main__': # Something called sys.exit(), lets just exit res = 1 raise # Pass on regular exit callse - except ApiError as e: + except (ApiError, FailedJobError) as e: eprint(str(e)) output = { "command": args.subparser_name, @@ -875,4 +970,3 @@ if __name__ == '__main__': f.write("\n") f.close() exit(res) -