diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ca18f2c4..f799bc22 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,13 +42,16 @@ jobs: - name: Checkout uses: actions/checkout@v3 + - name: Build DB container + run: docker build -t piker:elastic dockering/elastic + - name: Setup python uses: actions/setup-python@v3 with: python-version: '3.10' - name: Install dependencies - run: pip install -U . -r requirements-test.txt -r requirements.txt --upgrade-strategy eager + run: pip install -U .[es] -r requirements-test.txt -r requirements.txt --upgrade-strategy eager - name: Test suite run: pytest tests -rs diff --git a/dockering/elastic/Dockerfile b/dockering/elastic/Dockerfile new file mode 100644 index 00000000..f497a7a3 --- /dev/null +++ b/dockering/elastic/Dockerfile @@ -0,0 +1,11 @@ +FROM elasticsearch:7.17.4 + +ENV ES_JAVA_OPTS "-Xms2g -Xmx2g" +ENV ELASTIC_USERNAME "elastic" +ENV ELASTIC_PASSWORD "password" + +COPY elasticsearch.yml /usr/share/elasticsearch/config/ + +RUN printf "password" | ./bin/elasticsearch-keystore add -f -x "bootstrap.password" + +EXPOSE 19200 diff --git a/dockering/elastic/elasticsearch.yml b/dockering/elastic/elasticsearch.yml new file mode 100644 index 00000000..fdaa905f --- /dev/null +++ b/dockering/elastic/elasticsearch.yml @@ -0,0 +1,5 @@ +network.host: 0.0.0.0 + +http.port: 19200 + +discovery.type: single-node diff --git a/piker/_daemon.py b/piker/_daemon.py index f4acf9f3..8983eccc 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -41,6 +41,9 @@ from .log import ( ) from .brokers import get_brokermod +from pprint import pformat +from functools import partial + log = get_logger(__name__) @@ -313,6 +316,7 @@ async def open_piker_runtime( @acm async def open_pikerd( + loglevel: str | None = None, # XXX: you should pretty much never want debug mode @@ -320,6 +324,10 @@ async def open_pikerd( debug_mode: bool = False, registry_addr: None | tuple[str, int] = None, + # db init flags + tsdb: bool = False, + es: bool = False, + ) -> Services: ''' Start a root piker daemon who's lifetime extends indefinitely until @@ -349,12 +357,54 @@ async def open_pikerd( ): assert root_actor.accept_addr == reg_addr + if tsdb: + from piker.data._ahab import start_ahab + from piker.data.marketstore import start_marketstore + + log.info('Spawning `marketstore` supervisor') + ctn_ready, config, (cid, pid) = await service_nursery.start( + start_ahab, + 'marketstored', + start_marketstore, + + ) + log.info( + f'`marketstored` up!\n' + f'pid: {pid}\n' + f'container id: {cid[:12]}\n' + f'config: {pformat(config)}' + ) + + if es: + from piker.data._ahab import start_ahab + from piker.data.elastic import start_elasticsearch + + log.info('Spawning `elasticsearch` supervisor') + ctn_ready, config, (cid, pid) = await service_nursery.start( + partial( + start_ahab, + 'elasticsearch', + start_elasticsearch, + start_timeout=240.0 # high cause ci + ) + ) + + log.info( + f'`elasticsearch` up!\n' + f'pid: {pid}\n' + f'container id: {cid[:12]}\n' + f'config: {pformat(config)}' + ) + # assign globally for future daemon/task creation Services.actor_n = actor_nursery Services.service_n = service_nursery Services.debug_mode = debug_mode + + try: yield Services + finally: # TODO: is this more clever/efficient? # if 'samplerd' in Services.service_tasks: @@ -390,6 +440,8 @@ async def maybe_open_runtime( async def maybe_open_pikerd( loglevel: Optional[str] = None, registry_addr: None | tuple = None, + tsdb: bool = False, + es: bool = False, **kwargs, @@ -439,6 +491,8 @@ async def maybe_open_pikerd( loglevel=loglevel, debug_mode=kwargs.get('debug_mode', False), registry_addr=registry_addr, + tsdb=tsdb, + es=es, ) as service_manager: # in the case where we're starting up the diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 07484634..9b6f225c 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -20,6 +20,7 @@ CLI commons. ''' import os from pprint import pformat +from functools import partial import click import trio @@ -48,6 +49,11 @@ log = get_logger('cli') is_flag=True, help='Enable local ``marketstore`` instance' ) +@click.option( + '--es', + is_flag=True, + help='Enable local ``elasticsearch`` instance' +) def pikerd( loglevel: str, host: str, @@ -55,11 +61,13 @@ def pikerd( tl: bool, pdb: bool, tsdb: bool, + es: bool, ): ''' Spawn the piker broker-daemon. ''' + from .._daemon import open_pikerd log = get_console_log(loglevel) @@ -80,9 +88,10 @@ def pikerd( ) async def main(): - async with ( open_pikerd( + tsdb=tsdb, + es=es, loglevel=loglevel, debug_mode=pdb, registry_addr=reg_addr, @@ -90,23 +99,6 @@ def pikerd( ), # normally delivers a ``Services`` handle trio.open_nursery() as n, ): - if tsdb: - from piker.data._ahab import start_ahab - from piker.data.marketstore import start_marketstore - - log.info('Spawning `marketstore` supervisor') - ctn_ready, config, (cid, pid) = await n.start( - start_ahab, - 'marketstored', - start_marketstore, - - ) - log.info( - f'`marketstored` up!\n' - f'pid: {pid}\n' - f'container id: {cid[:12]}\n' - f'config: {pformat(config)}' - ) await trio.sleep_forever() @@ -213,6 +205,7 @@ def services(config, tl, ports): def _load_clis() -> None: from ..data import marketstore # noqa + from ..data import elastic from ..data import cli # noqa from ..brokers import cli # noqa from ..ui import cli # noqa diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 218d46e0..39a5b46a 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -124,7 +124,9 @@ class Container: async def process_logs_until( self, - patt: str, + # this is a predicate func for matching log msgs emitted by the + # underlying containerized app + patt_matcher: Callable[[str], bool], bp_on_msg: bool = False, ) -> bool: ''' @@ -135,7 +137,14 @@ class Container: seen_so_far = self.seen_so_far while True: - logs = self.cntr.logs() + try: + logs = self.cntr.logs() + except ( + docker.errors.NotFound, + docker.errors.APIError + ): + return False + entries = logs.decode().split('\n') for entry in entries: @@ -143,31 +152,38 @@ class Container: if not entry: continue + entry = entry.strip() try: - record = json.loads(entry.strip()) - except json.JSONDecodeError: - if 'Error' in entry: - raise RuntimeError(entry) - raise + record = json.loads(entry) + + if 'msg' in record: + msg = record['msg'] + elif 'message' in record: + msg = record['message'] + else: + raise KeyError(f'Unexpected log format\n{record}') + + level = record['level'] + + except json.JSONDecodeError: + msg = entry + level = 'error' - msg = record['msg'] - level = record['level'] if msg and entry not in seen_so_far: seen_so_far.add(entry) if bp_on_msg: await tractor.breakpoint() - getattr(log, level, log.error)(f'{msg}') + getattr(log, level.lower(), log.error)(f'{msg}') - # print(f'level: {level}') - if level in ('error', 'fatal'): + if level == 'fatal': raise ApplicationLogError(msg) - if patt in msg: + if await patt_matcher(msg): return True # do a checkpoint so we don't block if cancelled B) - await trio.sleep(0.01) + await trio.sleep(0.1) return False @@ -285,6 +301,7 @@ class Container: async def open_ahabd( ctx: tractor.Context, endpoint: str, # ns-pointer str-msg-type + start_timeout: float = 1.0, **kwargs, @@ -300,17 +317,20 @@ async def open_ahabd( ( dcntr, cntr_config, - start_msg, - stop_msg, + start_lambda, + stop_lambda, ) = ep_func(client) cntr = Container(dcntr) - with trio.move_on_after(1): - found = await cntr.process_logs_until(start_msg) + with trio.move_on_after(start_timeout): + found = await cntr.process_logs_until(start_lambda) + + if not found and dcntr not in client.containers.list(): + for entry in cntr.seen_so_far: + log.info(entry) - if not found and cntr not in client.containers.list(): raise RuntimeError( - 'Failed to start `marketstore` check logs deats' + f'Failed to start {dcntr.id} check logs deats' ) await ctx.started(( @@ -326,12 +346,13 @@ async def open_ahabd( await trio.sleep_forever() finally: - await cntr.cancel(stop_msg) + await cntr.cancel(stop_lambda) async def start_ahab( service_name: str, endpoint: Callable[docker.DockerClient, DockerContainer], + start_timeout: float = 1.0, task_status: TaskStatus[ tuple[ trio.Event, @@ -379,6 +400,7 @@ async def start_ahab( async with portal.open_context( open_ahabd, endpoint=str(NamespacePath.from_ref(endpoint)), + start_timeout=start_timeout ) as (ctx, first): cid, pid, cntr_config = first diff --git a/piker/data/elastic.py b/piker/data/elastic.py new file mode 100644 index 00000000..43c6afd0 --- /dev/null +++ b/piker/data/elastic.py @@ -0,0 +1,109 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from __future__ import annotations +from contextlib import asynccontextmanager as acm +from pprint import pformat +from typing import ( + Any, + TYPE_CHECKING, +) + +import pyqtgraph as pg +import numpy as np +import tractor + + +if TYPE_CHECKING: + import docker + from ._ahab import DockerContainer + +from piker.log import ( + get_logger, + get_console_log +) + +import asks + + +log = get_logger(__name__) + + +# container level config +_config = { + 'port': 19200, + 'log_level': 'debug', +} + + +def start_elasticsearch( + client: docker.DockerClient, + + **kwargs, + +) -> tuple[DockerContainer, dict[str, Any]]: + ''' + Start and supervise an elasticsearch instance with its config bind-mounted + in from the piker config directory on the system. + + The equivalent cli cmd to this code is: + + sudo docker run \ + -itd \ + --rm \ + --network=host \ + --mount type=bind,source="$(pwd)"/elastic,target=/usr/share/elasticsearch/data \ + --env "elastic_username=elastic" \ + --env "elastic_password=password" \ + --env "xpack.security.enabled=false" \ + elastic + + ''' + import docker + get_console_log('info', name=__name__) + + dcntr: DockerContainer = client.containers.run( + 'piker:elastic', + name='piker-elastic', + network='host', + detach=True, + remove=True + ) + + async def start_matcher(msg: str): + try: + health = (await asks.get( + f'http://localhost:19200/_cat/health', + params={'format': 'json'} + )).json() + + except OSError: + log.error('couldnt reach elastic container') + return False + + log.info(health) + return health[0]['status'] == 'green' + + async def stop_matcher(msg: str): + return msg == 'closed' + + return ( + dcntr, + {}, + # expected startup and stop msgs + start_matcher, + stop_matcher, + ) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 88553af7..236bcfaf 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -189,13 +189,20 @@ def start_marketstore( init=True, # remove=True, ) + + async def start_matcher(msg: str): + return "launching tcp listener for all services..." in msg + + async def stop_matcher(msg: str): + return "exiting..." in msg + return ( dcntr, _config, # expected startup and stop msgs - "launching tcp listener for all services...", - "exiting...", + start_matcher, + stop_matcher, ) diff --git a/setup.py b/setup.py index bd6363c5..2a686cc5 100755 --- a/setup.py +++ b/setup.py @@ -85,7 +85,10 @@ setup( 'tsdb': [ 'docker', ], - + 'es': [ + 'docker', + 'elasticsearch' + ] }, tests_require=['pytest'], python_requires=">=3.10", diff --git a/tests/test_databases.py b/tests/test_databases.py new file mode 100644 index 00000000..4eb444f3 --- /dev/null +++ b/tests/test_databases.py @@ -0,0 +1,66 @@ +import pytest +import trio + +from typing import AsyncContextManager + +from piker._daemon import Services +from piker.log import get_logger + +from elasticsearch import Elasticsearch +from piker.data import marketstore + +def test_marketstore_startup_and_version( + open_test_pikerd: AsyncContextManager, + loglevel, +): + + ''' + Verify marketstore starts correctly + + ''' + log = get_logger(__name__) + + async def main(): + # port = 5995 + + async with ( + open_test_pikerd( + loglevel=loglevel, + tsdb=True + ) as (s, i, pikerd_portal, services), + marketstore.get_client() as client + ): + + assert ( + len(await client.server_version()) == + len('3862e9973da36cfc6004b88172c08f09269aaf01') + ) + + + trio.run(main) + + +def test_elasticsearch_startup_and_version( + open_test_pikerd: AsyncContextManager, + loglevel, +): + ''' + Verify elasticsearch starts correctly + + ''' + + log = get_logger(__name__) + + async def main(): + port = 19200 + + async with open_test_pikerd( + loglevel=loglevel, + es=True + ) as (s, i, pikerd_portal, services): + + es = Elasticsearch(hosts=[f'http://localhost:{port}']) + assert es.info()['version']['number'] == '7.17.4' + + + trio.run(main)