diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 21688b8b..7bc4510c 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -254,8 +254,15 @@ async def cascade( # respawn the compute task if the source # array has been updated such that we compute # new history from the (prepended) source. + diff = src.index - dst.index new_len = len(src.array) - if new_len > last_len + 1: + + # XXX: ok no idea why this works but "drunk fix" + # says it don't matter. + if ( + new_len > last_len + 1 or + abs(diff) > 1 + ): log.warning(f're-syncing fsp {func_name} to source') cs.cancel() cs, index = await n.start(fsp_target) @@ -266,24 +273,12 @@ async def cascade( # read out last shm row, copy and write new row array = dst.array - # TODO: some signals, like vlm should be reset to - # zero every step. - last = array[-1:].copy() + # some metrics, like vlm should be reset + # to zero every step. if zero_on_step: last = zeroed + else: + last = array[-1:].copy() dst.push(last) last_len = new_len - - # compare again with source and make sure - # histories are index aligned. - diff = src.index - dst.index - if diff: - if abs(diff) < 10: - log.warning( - f'syncing fsp to source by offset: {diff}') - history = dst.array - dst.push(history[:-1], start=src._first.value) - else: - log.warning( - f'large offset {diff} re-spawn ongoing?..')