skynet/skynet/dgpu/network.py

313 lines
9.2 KiB
Python

#!/usr/bin/python
import io
import json
import time
import logging
from pathlib import Path
from functools import partial
import asks
import trio
import anyio
from PIL import Image, UnidentifiedImageError
from leap.cleos import CLEOS
from leap.sugar import Checksum256, Name, asset_from_str
from skynet.constants import DEFAULT_IPFS_DOMAIN
from skynet.ipfs import AsyncIPFSHTTP, get_ipfs_file
from skynet.dgpu.errors import DGPUComputeError
REQUEST_UPDATE_TIME = 3
async def failable(fn: partial, ret_fail=None):
try:
return await fn()
except (
OSError,
json.JSONDecodeError,
asks.errors.RequestTimeout,
asks.errors.BadHttpResponse,
anyio.BrokenResourceError
):
return ret_fail
class SkynetGPUConnector:
def __init__(self, config: dict):
self.account = Name(config['account'])
self.permission = config['permission']
self.key = config['key']
self.node_url = config['node_url']
self.hyperion_url = config['hyperion_url']
self.cleos = CLEOS(
None, None, self.node_url, remote=self.node_url)
self.ipfs_gateway_url = None
if 'ipfs_gateway_url' in config:
self.ipfs_gateway_url = config['ipfs_gateway_url']
self.ipfs_url = config['ipfs_url']
self.ipfs_client = AsyncIPFSHTTP(self.ipfs_url)
self.ipfs_domain = DEFAULT_IPFS_DOMAIN
if 'ipfs_domain' in config:
self.ipfs_domain = config['ipfs_domain']
self._wip_requests = {}
# blockchain helpers
async def get_work_requests_last_hour(self):
logging.info('get_work_requests_last_hour')
return await failable(
partial(
self.cleos.aget_table,
'telos.gpu', 'telos.gpu', 'queue',
index_position=2,
key_type='i64',
lower_bound=int(time.time()) - 3600
), ret_fail=[])
async def get_status_by_request_id(self, request_id: int):
logging.info('get_status_by_request_id')
return await failable(
partial(
self.cleos.aget_table,
'telos.gpu', request_id, 'status'), ret_fail=[])
async def get_global_config(self):
logging.info('get_global_config')
rows = await failable(
partial(
self.cleos.aget_table,
'telos.gpu', 'telos.gpu', 'config'))
if rows:
return rows[0]
else:
return None
async def get_worker_balance(self):
logging.info('get_worker_balance')
rows = await failable(
partial(
self.cleos.aget_table,
'telos.gpu', 'telos.gpu', 'users',
index_position=1,
key_type='name',
lower_bound=self.account,
upper_bound=self.account
))
if rows:
return rows[0]['balance']
else:
return None
async def get_competitors_for_req(self, request_id: int) -> set:
competitors = [
status['worker']
for status in
(await self.get_status_by_request_id(request_id))
if status['worker'] != self.account
]
logging.info(f'competitors: {competitors}')
return set(competitors)
async def get_full_queue_snapshot(self):
snap = {
'requests': {},
'my_results': []
}
snap['queue'] = await self.get_work_requests_last_hour()
async def _run_and_save(d, key: str, fn, *args, **kwargs):
d[key] = await fn(*args, **kwargs)
async with trio.open_nursery() as n:
n.start_soon(_run_and_save, snap, 'my_results', self.find_my_results)
for req in snap['queue']:
n.start_soon(
_run_and_save, snap['requests'], req['id'], self.get_status_by_request_id, req['id'])
return snap
async def begin_work(self, request_id: int):
logging.info('begin_work')
return await failable(
partial(
self.cleos.a_push_action,
'telos.gpu',
'workbegin',
{
'worker': self.account,
'request_id': request_id,
'max_workers': 2
},
self.account, self.key,
permission=self.permission
)
)
async def cancel_work(self, request_id: int, reason: str):
logging.info('cancel_work')
return await failable(
partial(
self.cleos.a_push_action,
'telos.gpu',
'workcancel',
{
'worker': self.account,
'request_id': request_id,
'reason': reason
},
self.account, self.key,
permission=self.permission
)
)
async def maybe_withdraw_all(self):
logging.info('maybe_withdraw_all')
balance = await self.get_worker_balance()
if not balance:
return
balance_amount = float(balance.split(' ')[0])
if balance_amount > 0:
await failable(
partial(
self.cleos.a_push_action,
'telos.gpu',
'withdraw',
{
'user': self.account,
'quantity': asset_from_str(balance)
},
self.account, self.key,
permission=self.permission
)
)
async def find_my_results(self):
logging.info('find_my_results')
return await failable(
partial(
self.cleos.aget_table,
'telos.gpu', 'telos.gpu', 'results',
index_position=4,
key_type='name',
lower_bound=self.account,
upper_bound=self.account
)
)
async def submit_work(
self,
request_id: int,
request_hash: str,
result_hash: str,
ipfs_hash: str
):
logging.info('submit_work')
return await failable(
partial(
self.cleos.a_push_action,
'telos.gpu',
'submit',
{
'worker': self.account,
'request_id': request_id,
'request_hash': Checksum256(request_hash),
'result_hash': Checksum256(result_hash),
'ipfs_hash': ipfs_hash
},
self.account, self.key,
permission=self.permission
)
)
# IPFS helpers
async def publish_on_ipfs(self, raw, typ: str = 'png'):
Path('ipfs-staging').mkdir(exist_ok=True)
logging.info('publish_on_ipfs')
target_file = ''
match typ:
case 'png':
raw: Image
target_file = 'ipfs-staging/image.png'
raw.save(target_file)
case _:
raise ValueError(f'Unsupported output type: {typ}')
if self.ipfs_gateway_url:
# check peer connections, reconnect to skynet gateway if not
gateway_id = Path(self.ipfs_gateway_url).name
peers = await self.ipfs_client.peers()
if gateway_id not in [p['Peer'] for p in peers]:
await self.ipfs_client.connect(self.ipfs_gateway_url)
file_info = await self.ipfs_client.add(Path(target_file))
file_cid = file_info['Hash']
await self.ipfs_client.pin(file_cid)
return file_cid
async def get_input_data(self, ipfs_hash: str) -> tuple[bytes, str]:
input_type = 'none'
if ipfs_hash == '':
return b'', input_type
results = {}
ipfs_link = f'https://{self.ipfs_domain}/ipfs/{ipfs_hash}'
ipfs_link_legacy = ipfs_link + '/image.png'
async with trio.open_nursery() as n:
async def get_and_set_results(link: str):
res = await get_ipfs_file(link, timeout=1)
logging.info(f'got response from {link}')
if not res or res.status_code != 200:
logging.warning(f'couldn\'t get ipfs binary data at {link}!')
else:
try:
# attempt to decode as image
results[link] = Image.open(io.BytesIO(res.raw))
input_type = 'png'
n.cancel_scope.cancel()
except UnidentifiedImageError:
logging.warning(f'couldn\'t get ipfs binary data at {link}!')
n.start_soon(
get_and_set_results, ipfs_link)
n.start_soon(
get_and_set_results, ipfs_link_legacy)
input_data = None
if ipfs_link_legacy in results:
input_data = results[ipfs_link_legacy]
if ipfs_link in results:
input_data = results[ipfs_link]
if input_data == None:
raise DGPUComputeError('Couldn\'t gather input data from ipfs')
return input_data, input_type