friendly list as a change log

I need an advice on the data structure to use as an atomic change log.

I'm trying to implement the following algorithm. There is a flow of incoming changes updating an in-memory map. In Haskell-like pseudocode it is

    update :: DataSet -> SomeListOf Change -> Change -> STM (DataSet, SomeListOf Change)
    update dataSet existingChanges newChange = do
      ...
      return (dataSet, existingChanges ++ [newChange])

where DataSet is a map (currently it is the Map from the stm-containers package, https://hackage.haskell.org/package/stm-containers-0.2.10/docs/STMContainers-Map.html). The whole "update" is called from arbitrary number of threads. Some of the Change's can be rejected due to domain semantics, I use throwSTM for that to throw away the effect of the transaction. In case of successful commit the "newChange" is added to the list.

There exists separate thread which calls the following function:

    flush :: STM (DataSet, SomeListOf Change) -> IO ()

this function is supposed to take the current snapshot of DataSet together with the list of changes (it has to a consistent pair) and flush it to the filesystem, ie

    flush data = do
      (dataSet, changes) <- atomically $ readTVar data_
      -- write them both to FS
      -- ...
      atomically $ writeTVar data_ (dataSet, [])

I need an advice about the data structure to use for "SomeListOf Change". I don't want to use [Change] because it is "too ordered" and I'm afraid there will be too many conflicts, which will force the whole transaction to retry. Please correct me, if I'm wrong here.

I cannot use the Set (https://hackage.haskell.org/package/stm-containers-0.2.10/docs/STMContainers-Set.html) because I still need to preserve some order, eg the order of transaction commits. I could use TChan for it and it looks like a good match (exactly the order of transaction commits), but I don't know how to implement the "flush" function so that it would give the consistent view of the whole change log together with the DataSet.

The current implementation of that is here https://github.com/lolepezy/rpki-pub-server/blob/add-storage/src/RRDP/Repo.hs, in the functions applyActionsToState and rrdpSyncThread, respectively. It uses TChan and seems to do it in a wrong way.

Thank you in advance.

Update: A reasonable answer seems to be like that

    type SomeListOf c = TChan [c] 

    update :: DataSet -> TChan [Change] -> Change -> STM DataSet
    update dataSet existingChanges newChange = do
      ...
      writeTChan changeChan $ reverse (newChange : existingChanges)
      return dataSet

   flush data_ = do
      (dataSet, changes) <- atomically $ (,) <$> readTVar data_ <*> readTChan changeChan
      -- write them both to FS
      -- ...

But I'm still not sure whether it's a neat solution to pass the whole list as an element of the channel.


I'd probably just go with the list and see how far it takes performance-wise. Given that, you should consider that both, appending to the end of a list and reversing it are O(n) operations, so you should try to avoid this. Maybe you can just prepend the incoming changes like this:

update dataSet existingChanges newChange = do
  -- ...
  return (dataSet, newChange : existingChanges)

Also, your example for flush has the problem that reading and updating the state is not atomic at all. You must accomplish this using a single atomically call like so:

flush data = do
  (dataSet, changes) <- atomically $ do
    result <- readTVar data_
    writeTVar data_ (dataSet, [])
    return result

  -- write them both to FS
  -- ...

You could then just write them out in reverse order (because now changes contains the elements from newest to oldest) or reverse here once if it's important to write them out oldest to newest. If that's important I'd probably go with some data structure which allows O(1) element access like a good old vector.

When using a fixed-size vector you would obviously have to deal with the problem that it can become "full" which would mean your writers would have to wait for flush to do it's job before adding fresh changes. That's why I'd personally go for the simple list first and see if it's sufficient or where it needs to be improved.

PS: A dequeue might be a good fit for your problem as well, but going fixed size forces you to deal with the problem that your writers can potentially produce more changes than your reader can flush out. The dequeue can grow infinitely, but you your RAM probably isn't. And the vector has pretty low overhead.


I made some (very simplistic) investigation https://github.com/lolepezy/rpki-pub-server/tree/add-storage/test/changeLog imitating exactly the type of load I supposedly going to have. I used the same STMContainers.Map for the data set and usual list for the change log. To track the number of transaction retries, I used Debug.Trace.trace, meaning, the number of lines printed by trace. And the number of unique lines printed by trace gives me the number of committed transactions.

The result is here (https://github.com/lolepezy/rpki-pub-server/blob/add-storage/test/changeLog/numbers.txt). The first column is the number of threads, the second is the number of change sets generated in total. The third column is the number of trace calls for the case without change log and the last one is the number of trace calls with the change log.

Apparently most of the time change log adds some extra retries, but it's pretty much insignificant. So, I guess, it's fair to say that any data structure would be good enough, because most of the work is related to updating the map and most of the retries are happening because of it.

链接地址: http://www.djcxy.com/p/81460.html

上一篇: Haskell STM并重试

下一篇: 友好的列表作为更改日志