How would I organize these calls using Reactive Extensions (Rx) in Silverlight?

I have some calls that must execute sequentially. Consider an IService that has a Query and a Load method. The Query gives a list of widgets, and the load provides a "default" widget. Hence, my service looks like this.

void IService.Query(Action<IEnumerable<Widget>,Exception> callback);
void IService.Load(Action<Widget,Exception> callback); 

With that in mind, here is a rough sketch of the view model:

public class ViewModel : BaseViewModel
{
   public ViewModel()
   {
      Widgets = new ObservableCollection<Widget>();

      WidgetService.Query((widgets,exception) =>
      {
          if (exception != null) 
          {
              throw exception;
          }

          Widgets.Clear();

          foreach(var widget in widgets)
          {
             Widgets.Add(widget);
          }

          WidgetService.Load((defaultWidget,ex) =>
          {
             if (ex != null)
             {
                 throw ex;
             }
             if (defaultWidget != null)
             {
                CurrentWidget = defaultWidget;
             }
          }
      });
   }

   public IService WidgetService { get; set; } // assume this is wired up

   public ObservableCollection<Widget> Widgets { get; private set; }

   private Widget _currentWidget; 

   public Widget CurrentWidget 
   {
      get { return _currentWidget; }
      set 
      {
         _currentWidget = value; 
         RaisePropertyChanged(()=>CurrentWidget);
      }
   }
}

What I'd like to do is simplify the sequential workflow of calling query and then the default. Perhaps the best way to do this is nested with lambda expressions as I've shown, but I figured there may be a more elegant way with Rx. I don't want to use Rx for the sake of Rx, but if it can allow me to organize the logic above so it is easier to read/maintain in the method, I'll take advantage of it. Ideally, something like:

Observable.Create(
   ()=>firstAction(), 
   ()=>secondAction())
.Subscribe(action=>action(),error=>{ throw error; }); 

With the power threading library, I'd do something like:

Service.Query(list=>{result=list};
yield return 1;
ProcessList(result);
Service.Query(widget=>{defaultWidget=widget};
yield return 1;
CurrentWidget = defaultWidget;

That makes it far more evident that the workflow is sequential and eliminates nesting (the yields are part of the async enumerator and are boundaries that block until the results come back).

Anything similar would make sense to me.

So the essence of the question: am I trying to fit a square peg into a round hole, or is there a way to redefine the nested asynchronous calls using Rx?


You can convert methods of service so they return IObservable instead of taking callback as a parameter. In this case sequential workflow can be implemented using SelectMany , something like this...

        WidgetService.Query()
            .SelectMany(
                widgets =>
                {
                    Widgets.Clear();
                    foreach (var w in widgets)
                    {
                        Widgets.Add(w);
                    }

                    return WidgetService.Load();
                }
            )
            .Do(
                defaultWidget =>
                {
                    if (defaultWidget != null)
                        Default = defaultWidget;
                }
            )
            .Subscribe(
                _ => { },
                e => { throw e; }
            );

However IMO F# asyncs will look much more clear (in sample I assume that methods of service returns Async> and Async respectively). Note that sample doesn't take in account what thread is modifying data fields, in real-world code you should pay attention to this:

    let load = async {
            let! widgets = WidgetService.Query()

            Widgets.Clear()
            for w in widgets do
                Widgets.Add(w)

            let! defaultWidget = WidgetService.Load()
            if defaultWidget <> null then
                Default <- defaultWidget

            return ()
        }

    Async.StartWithContinuations(
        load, 
        ignore, // success continuation - ignore result
        raise,  // error continuation - reraise exception
        ignore  // cancellation continuation - ignore
        )

EDITED

In fact it is possible to use technique with iterators you mentioned in your question:

    private IEnumerable<IObservable<object>> Intialize()
    {
        var widgetsList = WidgetService.Query().Start();
        yield return widgetsList;

        Widgets.Clear();
        foreach (var w in widgetsList[0])
        {
            Widgets.Add(w);
        }

        var defaultWidgetList = WidgetService.Load().Start();
        yield return defaultWidgetList;

        if (defaultWidgetList[0] != null)
            Default = defaultWidgetList[0];
    }

    Observable
        .Iterate(Intialize)
        .Subscribe(
        _ => { },
        ex => { throw ex; }
        );

你也可以使用ReactiveXaml来做到这一点,虽然你的CurrentWidget和Widgets都是可变的,但你不能把它变成干净的(有一个名为ObservableAsPropertyHelper的类,它将更新基于IObservable的属性并激发RaisePropertyChanged):

public class ViewModel
{
    public ViewModel()
    {
        // These return a Func that wraps an async call in an IObservable<T>
        // that always yields only one item (the result of the call)
        var QueryAsObservable = Observable.FromAsyncCommand<IEnumerable<Widget>>(WebService.BeginQuery, WebService.EndQuery);
        var LoadAsObservable = Observable.FromAsyncCommand<Widget>(WebService.BeginLoad, WebService.EndLoad);

        // Create a new command 
        QueryAndLoad = new ReactiveAsyncCommand();

        // QueryAndLoad fires every time someone calls ICommand.Execute
        // The .Do is the hacky part, for sync calls it's hidden by RegisterAsyncFunction
        var async_results = QueryAndLoad.SelectMany(_ => QueryAsObservable())
                                        .Do(_ => DoTranslate.AsyncCompletedNotification.OnNext(new Unit()));

        // Query up the Widgets 
        async_results.Subscribe(x => x.Run(Widgets.Add));

        // Now execute the Load
        async_results.SelectMany(_ => LoadAsObservable())
                     .Subscribe(x => CurrentWidget = x);

        QueryAndLoad.Execute();
    }

    public ReactiveAsyncCommand QueryAndLoad {get; private set; }

    public ObservableCollection<Widget> Widgets {get; private set; }

    public Widget CurrentWidget {get; set; }
}
链接地址: http://www.djcxy.com/p/47318.html

上一篇: 如何使用python连接到JMX代理

下一篇: 如何在Silverlight中使用Reactive Extensions(Rx)组织这些调用?