From 3dd82c8d31665233845ffe3b13756090e45159db Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 16:34:54 -0400 Subject: [PATCH] Fix the drunk fix This should finally be correct fsp src-to-dst array syncing now.. There's a few edge cases but mostly we need to be sure we sync both back-filled history diffs and avoid current step lag/leads. Use a polling routine and the more stringent task re-spawn system to get this right. --- piker/fsp/_engine.py | 94 +++++++++++++++++++++++++++++--------------- 1 file changed, 62 insertions(+), 32 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 2df3c126..70e86481 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -148,24 +148,27 @@ async def fsp_compute( # import time # last = time.time() - # rt stream - async for processed in out_stream: + try: + # rt stream + async for processed in out_stream: - log.debug(f"{func_name}: {processed}") - index = src.index - dst.array[-1][func_name] = processed + log.debug(f"{func_name}: {processed}") + index = src.index + dst.array[-1][func_name] = processed - # NOTE: for now we aren't streaming this to the consumer - # stream latest array index entry which basically just acts - # as trigger msg to tell the consumer to read from shm - if attach_stream: - await stream.send(index) + # NOTE: for now we aren't streaming this to the consumer + # stream latest array index entry which basically just acts + # as trigger msg to tell the consumer to read from shm + if attach_stream: + await stream.send(index) - # period = time.time() - last - # hz = 1/period if period else float('nan') - # if hz > 60: - # log.info(f'FSP quote too fast: {hz}') - # last = time.time() + # period = time.time() - last + # hz = 1/period if period else float('nan') + # if hz > 60: + # log.info(f'FSP quote too fast: {hz}') + # last = time.time() + finally: + tracker.complete.set() @tractor.context @@ -217,7 +220,7 @@ async def cascade( profiler(f'{func_name}: feed up') assert src.token == feed.shm.token - last_len = new_len = len(src.array) + # last_len = new_len = len(src.array) async with ( ctx.open_stream() as stream, @@ -249,9 +252,16 @@ async def cascade( await ctx.started(index) profiler(f'{func_name}: fsp up') + async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]: + # TODO: adopt an incremental update engine/approach + # where possible here eventually! + log.warning(f're-syncing fsp {func_name} to source') + tracker.cs.cancel() + await tracker.complete.wait() + return await n.start(fsp_target) + # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. - async with feed.index_stream() as stream: profiler(f'{func_name}: sample stream up') @@ -263,23 +273,44 @@ async def cascade( # array has been updated such that we compute # new history from the (prepended) source. diff = src.index - dst.index - new_len = len(src.array) - # XXX: ok no idea why this works but "drunk fix" - # says it don't matter. + # new_len = len(src.array) + + async def poll_and_sync_to_step(tracker): + diff = src.index - dst.index + while True: + if diff in (0, 1): + break + + tracker, index = await resync(tracker) + diff = src.index - dst.index + # log.info( + # '\n'.join(( + # f'history index after sync: {index}', + # f'diff after sync: {diff}', + # )) + # ) + + return tracker, diff + + # log.debug(f'diff {diff}') + if ( - new_len > last_len + 1 or - abs(diff) > 1 - ): - # TODO: adopt an incremental update engine/approach - # where possible here eventually! - log.warning(f're-syncing fsp {func_name} to source') - tracker.cs.cancel() - await tracker.complete.wait() - tracker, index = await n.start(fsp_target) + # the source is likely backfilling and we must + # sync history calculations + abs(len(src.array) - len(dst.array)) > 0 or - # skip adding a new bar since we should be fully aligned. - continue + # we aren't step synced to the source and may be + # leading/lagging by a step + diff > 1 or + diff < 0 + ): + tracker, diff = await poll_and_sync_to_step(tracker) + + # skip adding a last bar since we should be + # source alinged + if diff == 0: + continue # read out last shm row, copy and write new row array = dst.array @@ -292,4 +323,3 @@ async def cascade( last = array[-1:].copy() dst.push(last) - last_len = new_len