From 614bb1717b7c6c0f8802bf12d214a88fb8a5c283 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 30 Sep 2021 07:33:43 -0400 Subject: [PATCH] Fix shm index update race There was a lingering issue where the fsp daemon would sync its shm array with the source data and we'd set the start/end indices to the same value. Under some races a reader would then read an empty `.array` which it wasn't expecting. This fixes that as well as tidies up the `ShmArray.push()` logic and adds a temporary check in `.array` for zero length if the array hasn't been written yet. We can now start removing read array length checks in consumer code and hopefully no more races will show up. --- piker/data/_sharedmem.py | 46 ++++++++++++++++++++++++++++++++++------ piker/ui/_chart.py | 33 ++++++++++++++-------------- 2 files changed, 56 insertions(+), 23 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 53c40423..8afa3214 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -168,6 +168,7 @@ class ShmArray: self._len = len(shmarr) self._shm = shm + self._post_init: bool = False # pushing data does not write the index (aka primary key) self._write_fields = list(shmarr.dtype.fields.keys())[1:] @@ -196,19 +197,37 @@ class ShmArray: @property def array(self) -> np.ndarray: - return self._array[self._first.value:self._last.value] + '''Return an up-to-date ``np.ndarray`` view of the + so-far-written data to the underlying shm buffer. + + ''' + a = self._array[self._first.value:self._last.value] + + # first, last = self._first.value, self._last.value + # a = self._array[first:last] + + # TODO: eventually comment this once we've not seen it in the + # wild in a long time.. + # XXX: race where first/last indexes cause a reader + # to load an empty array.. + if len(a) == 0 and self._post_init: + raise RuntimeError('Empty array race condition hit!?') + # breakpoint() + + return a def last( self, length: int = 1, ) -> np.ndarray: - return self.array[-length:] + return self.array[-length] def push( self, data: np.ndarray, prepend: bool = False, + start: Optional[int] = None, ) -> int: '''Ring buffer like "push" to append data @@ -217,17 +236,18 @@ class ShmArray: NB: no actual ring logic yet to give a "loop around" on overflow condition, lel. ''' + self._post_init = True length = len(data) + index = start or self._last.value if prepend: index = self._first.value - length + if index < 0: raise ValueError( f'Array size of {self._len} was overrun during prepend.\n' 'You have passed {abs(index)} too many datums.' ) - else: - index = self._last.value end = index + length @@ -235,11 +255,22 @@ class ShmArray: try: self._array[fields][index:end] = data[fields][:] + + # NOTE: there was a race here between updating + # the first and last indices and when the next reader + # tries to access ``.array`` (which due to the index + # overlap will be empty). Pretty sure we've fixed it now + # but leaving this here as a reminder. if prepend: + assert index < self._first.value + + if index < self._first.value: self._first.value = index else: self._last.value = end + return end + except ValueError as err: # shoudl raise if diff detected self.diff_err_fields(data) @@ -301,16 +332,19 @@ _default_size = 3 * _secs_in_day def open_shm_array( + key: Optional[str] = None, size: int = _default_size, dtype: Optional[np.dtype] = None, readonly: bool = False, + ) -> ShmArray: - """Open a memory shared ``numpy`` using the standard library. + '''Open a memory shared ``numpy`` using the standard library. This call unlinks (aka permanently destroys) the buffer on teardown and thus should be used from the parent-most accessor (process). - """ + + ''' # create new shared mem segment for which we # have write permission a = np.zeros(size, dtype=dtype) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 04f24d50..dc9387ea 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -936,11 +936,12 @@ class ChartPlotWidget(pg.PlotWidget): **kwargs, ) -> pg.GraphicsObject: - """Update the named internal graphics from ``array``. - - """ + '''Update the named internal graphics from ``array``. + ''' + assert len(array) data_key = array_key or graphics_name + if graphics_name not in self._overlays: self._arrays['ohlc'] = array else: @@ -948,21 +949,19 @@ class ChartPlotWidget(pg.PlotWidget): curve = self._graphics[graphics_name] - if len(array): - # TODO: we should instead implement a diff based - # "only update with new items" on the pg.PlotCurveItem - # one place to dig around this might be the `QBackingStore` - # https://doc.qt.io/qt-5/qbackingstore.html + # NOTE: back when we weren't implementing the curve graphics + # ourselves you'd have updates using this method: + # curve.setData(y=array[graphics_name], x=array['index'], **kwargs) - # NOTE: back when we weren't implementing the curve graphics - # ourselves you'd have updates using this method: - # curve.setData(y=array[graphics_name], x=array['index'], **kwargs) - - curve.update_from_array( - x=array['index'], - y=array[data_key], - **kwargs - ) + # NOTE: graphics **must** implement a diff based update + # operation where an internal ``FastUpdateCurve._xrange`` is + # used to determine if the underlying path needs to be + # pre/ap-pended. + curve.update_from_array( + x=array['index'], + y=array[data_key], + **kwargs + ) return curve