How to implement buffering with timeout in RX

I need to implement an event processing, that is done delayed when there are no new events arriving for a certain period. (I have to queue up a parsing task when the text buffer changed, but I don't want to start the parsing when the user is still typing.)

I'm new in RX, but as far as I see, I would need a combination of BufferWithTime and the Timeout methods. I imagine this to be working like this: it buffers the events until they are received regularly within a specified time period between the subsequent events. If there is a gap in the event flow (longer than the timespan) it should return propagate the events buffered so far.

Having a look at how Buffer and Timeout is implemented, I could probably implement my BufferWithTimeout method (if everyone have one, please share with me), but I wonder if this can be achieved just by combining the existing methods. Any ideas?


I think BufferWithTime is what you are after.

There is nothing built in, but something like this should work:

Note: If an error occurs from the source, the buffer is not flushed. This matches the current (or current last time I checked) functionality of BufferWith*

public static IObservable<TSource[]> BufferWithTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout)
{
    return source.BufferWithTimeout(timeout, Scheduler.TaskPool);
}

public static IObservable<TSource[]> BufferWithTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout, IScheduler scheduler)
{
    return Observable.CreateWithDisposable<TSource[]>(observer =>
    {
        object lockObject = new object();
        List<TSource> buffer = new List<TSource>();

        MutableDisposable timeoutDisposable = new MutableDisposable();

        Action flushBuffer = () =>
        {
            TSource[] values;

            lock(lockObject)
            {
                values = buffer.ToArray();
                buffer.Clear();
            }

            observer.OnNext(values);
        };

        var sourceSubscription = source.Subscribe(
            value =>
            {
                lock(lockObject)
                {
                    buffer.Add(value);
                }

                timeoutDisposable.Disposable = 
                    scheduler.Schedule(flushBuffer, timeout);
            },
            observer.OnError,
            () =>
            {
                flushBuffer();
                observer.OnCompleted();
            });

        return new CompositeDisposable(sourceSubscription, timeoutDisposable);
    });
}

This is quite an old question, but I do believe the following answer is worth mentioning, since all other solutions have forced the user to subscribe manually, track changes, etc.

I offer the following as an "Rx-y" solution.

var buffers = source
    .GroupByUntil(
        // yes. yes. all items belong to the same group.
        x => true,
        g => Observable.Amb<int>(
               // close the group after 5 seconds of inactivity
               g.Throttle(TimeSpan.FromSeconds(5)),
               // close the group after 10 items
               g.Skip(9)
             ))
    // Turn those groups into buffers
    .SelectMany(x => x.ToArray());

Basically, the source is windowed until some observerable defined in terms of the newest window. A new window (grouped observable) is created, and we use that window to determine when the window should close. In this case, I'm closing the window after 5 seconds of inactivity or a maximum length of 10 (9+1).


In addition to Richard Szalay's answer I've just been looking into the new Window operator from the latest rx release. It 'kind of' solves you problem in that you can 'buffer with a time out', ie get the output within a window of time that lasts until the timeout is reached, but instead of receiving the results as an IEnumerable you actually get them as an IObservable.

Here's a quick example of what I mean:

private void SetupStream()
{
    var inputStream = Observable.FromEvent<MouseButtonEventHandler, MouseButtonEventArgs>(
        h => new MouseButtonEventHandler(h), 
        h => MouseDown += h,
        h => MouseDown -= h);

    var timeout = inputStream.Select(evt => Observable.Timer(TimeSpan.FromSeconds(10), Scheduler.Dispatcher))
        .Switch();

    inputStream.Window(() => timeout)
        .Subscribe(OnWindowOpen);
}


private void OnWindowOpen(IObservable<IEvent<MouseButtonEventArgs>> window)
{
    Trace.WriteLine(string.Format("Window open"));

    var buffer = new List<IEvent<MouseButtonEventArgs>>();

    window.Subscribe(click =>
    {

        Trace.WriteLine(string.Format("Click"));

        buffer.Add(click);

    }, () => ProcessEvents(buffer));
}

private void ProcessEvents(IEnumerable<IEvent<MouseButtonEventArgs>> clicks)
{
    Trace.WriteLine(string.Format("Window closed"));

    //...
}

Every time the window opens, you receive all the events as and when they come in, store them in a buffer and process when the window completes (which actually happens when the next window opens).

Not sure if Richard would change his example to use Window now it's available but thought it might be worth raising as an alternative.

链接地址: http://www.djcxy.com/p/67306.html

上一篇: C#,BlockingCollection:如何等待收集少于N个项目

下一篇: 如何在RX中使用超时实现缓冲