Update flat-manager-client

This commit is contained in:
Nicolas Werner 2022-12-27 04:30:32 +01:00
parent f98b289ba2
commit 6529240be8
No known key found for this signature in database
GPG key ID: C8D75E610773F2D9

View file

@ -31,7 +31,8 @@ from functools import reduce
from urllib.parse import urljoin, urlparse, urlsplit, urlunparse, urlunsplit from urllib.parse import urljoin, urlparse, urlsplit, urlunparse, urlunsplit
import aiohttp import aiohttp
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed import aiohttp.client_exceptions
from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_random_exponential
import gi import gi
gi.require_version('OSTree', '1.0') gi.require_version('OSTree', '1.0')
@ -55,7 +56,7 @@ class ApiError(Exception):
self.status = response.status self.status = response.status
try: try:
self.body = json.loads(response); self.body = json.loads(body)
except: except:
self.body = {"status": self.status, "error-type": "no-error", "message": "No json error details from server"} self.body = {"status": self.status, "error-type": "no-error", "message": "No json error details from server"}
@ -71,6 +72,10 @@ class ApiError(Exception):
return "Api call to %s failed with status %d, details: %s" % (self.url, self.status, self.body) return "Api call to %s failed with status %d, details: %s" % (self.url, self.status, self.body)
TENACITY_RETRY_EXCEPTIONS = (retry_if_exception_type(aiohttp.client_exceptions.ServerDisconnectedError) | retry_if_exception_type(ApiError) | retry_if_exception_type(aiohttp.client_exceptions.ServerConnectionError))
TENACITY_STOP_AFTER = stop_after_delay(300)
TENACITY_WAIT_BETWEEN = wait_random_exponential(multiplier=1, max=60)
# This is similar to the regular payload, but opens the file lazily # This is similar to the regular payload, but opens the file lazily
class AsyncNamedFilePart(aiohttp.payload.Payload): class AsyncNamedFilePart(aiohttp.payload.Payload):
def __init__(self, def __init__(self,
@ -181,6 +186,13 @@ def chunks(l, n):
for i in range(0, len(l), n): for i in range(0, len(l), n):
yield l[i:i + n] yield l[i:i + n]
@retry(
stop=TENACITY_STOP_AFTER,
wait=TENACITY_WAIT_BETWEEN,
retry=TENACITY_RETRY_EXCEPTIONS,
reraise=True,
)
async def missing_objects(session, build_url, token, wanted): async def missing_objects(session, build_url, token, wanted):
missing=[] missing=[]
for chunk in chunks(wanted, 2000): for chunk in chunks(wanted, 2000):
@ -199,6 +211,12 @@ async def missing_objects(session, build_url, token, wanted):
missing.extend(data["missing"]) missing.extend(data["missing"])
return missing return missing
@retry(
stop=TENACITY_STOP_AFTER,
wait=TENACITY_WAIT_BETWEEN,
retry=(retry_if_exception_type(ApiError) | retry_if_exception_type(aiohttp.client_exceptions.ServerDisconnectedError)),
reraise=True,
)
async def upload_files(session, build_url, token, files): async def upload_files(session, build_url, token, files):
if len(files) == 0: if len(files) == 0:
return return
@ -212,6 +230,7 @@ async def upload_files(session, build_url, token, files):
if resp.status != 200: if resp.status != 200:
raise ApiError(resp, await resp.text()) raise ApiError(resp, await resp.text())
async def upload_deltas(session, repo_path, build_url, token, deltas, refs, ignore_delta): async def upload_deltas(session, repo_path, build_url, token, deltas, refs, ignore_delta):
if not len(deltas): if not len(deltas):
return return
@ -258,6 +277,13 @@ async def upload_objects(session, repo_path, build_url, token, objects):
# Upload any remainder # Upload any remainder
await upload_files(session, build_url, token, req) await upload_files(session, build_url, token, req)
@retry(
stop=TENACITY_STOP_AFTER,
wait=TENACITY_WAIT_BETWEEN,
retry=TENACITY_RETRY_EXCEPTIONS,
reraise=True,
)
async def create_ref(session, build_url, token, ref, commit): async def create_ref(session, build_url, token, ref, commit):
print("Creating ref %s with commit %s" % (ref, commit)) print("Creating ref %s with commit %s" % (ref, commit))
resp = await session.post(build_url + "/build_ref", headers={'Authorization': 'Bearer ' + token}, json= { "ref": ref, "commit": commit} ) resp = await session.post(build_url + "/build_ref", headers={'Authorization': 'Bearer ' + token}, json= { "ref": ref, "commit": commit} )
@ -268,6 +294,13 @@ async def create_ref(session, build_url, token, ref, commit):
data = await resp.json() data = await resp.json()
return data return data
@retry(
stop=TENACITY_STOP_AFTER,
wait=TENACITY_WAIT_BETWEEN,
retry=TENACITY_RETRY_EXCEPTIONS,
reraise=True,
)
async def add_extra_ids(session, build_url, token, extra_ids): async def add_extra_ids(session, build_url, token, extra_ids):
print("Adding extra ids %s" % (extra_ids)) print("Adding extra ids %s" % (extra_ids))
resp = await session.post(build_url + "/add_extra_ids", headers={'Authorization': 'Bearer ' + token}, json= { "ids": extra_ids} ) resp = await session.post(build_url + "/add_extra_ids", headers={'Authorization': 'Bearer ' + token}, json= { "ids": extra_ids} )
@ -278,6 +311,13 @@ async def add_extra_ids(session, build_url, token, extra_ids):
data = await resp.json() data = await resp.json()
return data return data
@retry(
stop=TENACITY_STOP_AFTER,
wait=TENACITY_WAIT_BETWEEN,
retry=TENACITY_RETRY_EXCEPTIONS,
reraise=True,
)
async def get_build(session, build_url, token): async def get_build(session, build_url, token):
resp = await session.get(build_url, headers={'Authorization': 'Bearer ' + token}) resp = await session.get(build_url, headers={'Authorization': 'Bearer ' + token})
if resp.status != 200: if resp.status != 200:
@ -290,6 +330,13 @@ def reparse_job_results(job):
job["results"] = json.loads(job.get("results", "{}")) job["results"] = json.loads(job.get("results", "{}"))
return job return job
@retry(
stop=TENACITY_STOP_AFTER,
wait=TENACITY_WAIT_BETWEEN,
retry=TENACITY_RETRY_EXCEPTIONS,
reraise=True,
)
async def get_job(session, job_url, token): async def get_job(session, job_url, token):
resp = await session.get(job_url, headers={'Authorization': 'Bearer ' + token}, json={}) resp = await session.get(job_url, headers={'Authorization': 'Bearer ' + token}, json={})
async with resp: async with resp:
@ -298,6 +345,13 @@ async def get_job(session, job_url, token):
data = await resp.json() data = await resp.json()
return data return data
@retry(
stop=TENACITY_STOP_AFTER,
wait=TENACITY_WAIT_BETWEEN,
retry=TENACITY_RETRY_EXCEPTIONS,
reraise=True,
)
async def wait_for_job(session, job_url, token): async def wait_for_job(session, job_url, token):
reported_delay = False reported_delay = False
old_job_status = 0 old_job_status = 0
@ -366,6 +420,13 @@ async def wait_for_job(session, job_url, token):
sleep_time=60 sleep_time=60
time.sleep(sleep_time) time.sleep(sleep_time)
@retry(
stop=TENACITY_STOP_AFTER,
wait=TENACITY_WAIT_BETWEEN,
retry=TENACITY_RETRY_EXCEPTIONS,
reraise=True,
)
async def commit_build(session, build_url, eol, eol_rebase, token_type, wait, token): async def commit_build(session, build_url, eol, eol_rebase, token_type, wait, token):
print("Committing build %s" % (build_url)) print("Committing build %s" % (build_url))
json = { json = {
@ -390,15 +451,24 @@ async def commit_build(session, build_url, eol, eol_rebase, token_type, wait, to
job["location"] = job_url job["location"] = job_url
return job return job
@retry(
stop=TENACITY_STOP_AFTER,
wait=TENACITY_WAIT_BETWEEN,
retry=TENACITY_RETRY_EXCEPTIONS,
reraise=True,
)
async def publish_build(session, build_url, wait, token): async def publish_build(session, build_url, wait, token):
print("Publishing build %s" % (build_url)) print("Publishing build %s" % (build_url))
resp = await session.post(build_url + "/publish", headers={'Authorization': 'Bearer ' + token}, json= { } ) resp = await session.post(build_url + "/publish", headers={'Authorization': 'Bearer ' + token}, json= { } )
async with resp: async with resp:
if resp.status == 400: if resp.status == 400:
body = await resp.text() body = await resp.json()
try: try:
msg = json.loads(body) if isinstance(body, str):
if msg.get("current-state", "") == "published": body = json.loads(body)
if body.get("current-state") == "published":
print("the build has been already published") print("the build has been already published")
return {} return {}
except: except:
@ -418,6 +488,13 @@ async def publish_build(session, build_url, wait, token):
job["location"] = job_url job["location"] = job_url
return job return job
@retry(
stop=TENACITY_STOP_AFTER,
wait=TENACITY_WAIT_BETWEEN,
retry=TENACITY_RETRY_EXCEPTIONS,
reraise=True,
)
async def purge_build(session, build_url, token): async def purge_build(session, build_url, token):
print("Purging build %s" % (build_url)) print("Purging build %s" % (build_url))
resp = await session.post(build_url + "/purge", headers={'Authorization': 'Bearer ' + token}, json= {} ) resp = await session.post(build_url + "/purge", headers={'Authorization': 'Bearer ' + token}, json= {} )
@ -426,6 +503,13 @@ async def purge_build(session, build_url, token):
raise ApiError(resp, await resp.text()) raise ApiError(resp, await resp.text())
return await resp.json() return await resp.json()
@retry(
stop=TENACITY_STOP_AFTER,
wait=TENACITY_WAIT_BETWEEN,
retry=TENACITY_RETRY_EXCEPTIONS,
reraise=True,
)
async def create_token(session, manager_url, token, name, subject, scope, duration): async def create_token(session, manager_url, token, name, subject, scope, duration):
token_url = urljoin(manager_url, "api/v1/token_subset") token_url = urljoin(manager_url, "api/v1/token_subset")
resp = await session.post(token_url, headers={'Authorization': 'Bearer ' + token}, json = { resp = await session.post(token_url, headers={'Authorization': 'Bearer ' + token}, json = {
@ -442,11 +526,23 @@ async def create_token(session, manager_url, token, name, subject, scope, durati
def get_object_multipart(repo_path, object): def get_object_multipart(repo_path, object):
return AsyncNamedFilePart(repo_path + "/objects/" + object[:2] + "/" + object[2:], filename=object) return AsyncNamedFilePart(repo_path + "/objects/" + object[:2] + "/" + object[2:], filename=object)
@retry(
stop=TENACITY_STOP_AFTER,
wait=TENACITY_WAIT_BETWEEN,
retry=TENACITY_RETRY_EXCEPTIONS,
reraise=True,
)
async def create_command(session, args): async def create_command(session, args):
build_url = urljoin(args.manager_url, "api/v1/build") build_url = urljoin(args.manager_url, "api/v1/build")
resp = await session.post(build_url, headers={'Authorization': 'Bearer ' + args.token}, json={ json = {
"repo": args.repo "repo": args.repo
}) }
if args.app_id is not None:
json["app-id"] = args.app_id
if args.public_download is not None:
json["public-download"] = args.public_download
resp = await session.post(build_url, headers={'Authorization': 'Bearer ' + args.token}, json=json)
async with resp: async with resp:
if resp.status != 200: if resp.status != 200:
raise ApiError(resp, await resp.text()) raise ApiError(resp, await resp.text())
@ -475,9 +571,9 @@ def build_url_to_api(build_url):
return urlunparse((parts.scheme, parts.netloc, path, None, None, None)) return urlunparse((parts.scheme, parts.netloc, path, None, None, None))
@retry( @retry(
stop=stop_after_attempt(6), stop=TENACITY_STOP_AFTER,
wait=wait_fixed(10), wait=TENACITY_WAIT_BETWEEN,
retry=retry_if_exception_type(ApiError), retry=TENACITY_RETRY_EXCEPTIONS,
reraise=True, reraise=True,
) )
async def push_command(session, args): async def push_command(session, args):
@ -630,6 +726,9 @@ if __name__ == '__main__':
create_parser = subparsers.add_parser('create', help='Create new build') create_parser = subparsers.add_parser('create', help='Create new build')
create_parser.add_argument('manager_url', help='remote repo manager url') create_parser.add_argument('manager_url', help='remote repo manager url')
create_parser.add_argument('repo', help='repo name') create_parser.add_argument('repo', help='repo name')
create_parser.add_argument('app_id', nargs='?', help='app ID')
create_parser.add_argument('--public_download', action='store_true', default=None, help='allow public read access to the build repo')
create_parser.add_argument('--no_public_download', action='store_false', dest='public_download', default=None, help='allow public read access to the build repo')
create_parser.set_defaults(func=create_command) create_parser.set_defaults(func=create_command)
push_parser = subparsers.add_parser('push', help='Push to repo manager') push_parser = subparsers.add_parser('push', help='Push to repo manager')
@ -717,8 +816,7 @@ if __name__ == '__main__':
res = 1 res = 1
output = None output = None
try: try:
loop = asyncio.get_event_loop() result = asyncio.run(run_with_session(args))
result = loop.run_until_complete(run_with_session(args))
output = { output = {
"command": args.subparser_name, "command": args.subparser_name,
@ -770,3 +868,4 @@ if __name__ == '__main__':
f.write("\n") f.write("\n")
f.close() f.close()
exit(res) exit(res)