Monadic与状态monad在不变的空间(堆和堆栈)中折叠?

在固定堆栈和堆空间中可以在状态单元中执行折叠吗? 或者是一种更适合我的问题的不同功能技术?

接下来的部分将介绍这个问题和一个激励用例。 我正在使用Scala,但Haskell中的解决方案也是受欢迎的。


折叠在State Monad填补堆

假设斯卡拉7.考虑国家单子中的monadic fold。 为避免堆栈溢出,我们将掀起折叠。

import scalaz._
import Scalaz._
import scalaz.std.iterable._
import Free.Trampoline

type TrampolinedState[S, B] = StateT[Trampoline, S, B] // monad type constructor

type S = Int  // state is an integer
type M[B] = TrampolinedState[S, B] // our trampolined state monad

type R = Int  // or some other monoid

val col: Iterable[R] = largeIterableofRs() // defined elsewhere

val (count, sum): (S, R) = col.foldLeftM[M, R](Monoid[R].zero){ 
    (acc: R, x: R) => StateT[Trampoline, S, R] {
      s: S => Trampoline.done { 
        (s + 1, Monoid[R].append(acc, x))
      }
    }
} run 0 run

// In Scalaz 7, foldLeftM is implemented in terms of foldRight, which in turn
// is a reversed.foldLeft. This pulls the whole collection into memory and kills
// the heap.  Ignore this heap overflow. We could reimplement foldLeftM to avoid
// this overflow or use a foldRightM instead.
// Our real issue is the heap used by the unexecuted State mobits.

对于一个大集合col ,这将填充堆。

我相信在折叠期间,会为集合中的每个值( x: R参数)创建一个闭包(州状态),填充堆。 在run 0执行之前,这些都不能被评估,提供初始状态。

这个O(n)堆的使用可以避免吗?

更具体地说,折叠之前是否可以提供初始状态,以便State monad可以在每次绑定期间执行,而不是嵌套关闭以供以后评估?

还是可以构建折叠,以便在国家monad run后懒惰地run ? 通过这种方式,下一个x: R闭包将不会被创建,直到之前的那些被评估并适合垃圾收集。

或者这种工作有更好的功能范式吗?


应用示例

但也许我正在使用错误的工具来完成这项工作。 下面是一个示例用例的演变。 我在这里徘徊在错误的道路上吗?

考虑油藏采样,即从一个集合中选取一个统一的随机k项目,以适合记忆。 在斯卡拉,这样的功能可能是

def sample[A](col: TraversableOnce[A])(k: Int): Vector[A]

如果可以使用像这样使用TraversableOnce类型

val tenRandomInts = (Int.Min to Int.Max) sample 10

sample完成的工作本质上是一个fold

def sample[A](col: Traversable[A])(k: Int): Vector[A] = {
    col.foldLeft(Vector()){update(k)(_: Vector[A], _: A)}
}

但是, update是有状态的; 它取决于n ,已经看到的项目的数量。 (它也取决于一个RNG,但为了简单起见,我认为它是全局的和有状态的。用于处理n的技术将平凡地延伸)。 那么如何处理这个状态呢?

不纯的解决方案很简单,并且以不变的堆栈和堆栈运行。

/* Impure version of update function */
def update[A](k: Int) = new Function2[Vector[A], A, Vector[A]] {
    var n = 0
    def apply(sample: Vector[A], x: A): Vector[A] = {
        n += 1
        algorithmR(k, n, acc, x)
    }
}

def algorithmR(k: Int, n: Int, acc: Vector[A], x: A): Vector[A] = {
    if (sample.size < k) {
        sample :+ x // must keep first k elements
    } else {
        val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
        if (r <= k)
            sample.updated(r - 1, x) // sample is 0-index
        else
            sample
    }
}

但是纯功能解决方案呢? update必须将n作为附加参数,并将新值与更新的样本一起返回。 我们可以将n纳入隐式状态,折叠累加器,例如,

(col.foldLeft ((0, Vector())) (update(k)(_: (Int, Vector[A]), _: A)))._2

但是这掩盖了意图; 我们只是真的打算累积样本向量。 这个问题似乎已经为国家monad和monadic left fold做好了准备。 让我们再试一次。

我们将使用Scalaz 7和这些导入

import scalaz._
import Scalaz._
import scalaz.std.iterable_

并且在Iterable[A] ,因为Scalaz不支持Traversable monadic折叠。

现在定义了sample

// sample using State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {       
    type M[B] = State[Int, B]

    // foldLeftM is implemented using foldRight, which must reverse `col`, blowing
    // the heap for large `col`.  Ignore this issue for now.
    // foldLeftM could be implemented differently or we could switch to
    // foldRightM, implemented using foldLeft.
    col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0
}

哪里更新

// update using State monad
def update(k: Int) = {
    (acc: Vector[A], x: A) => State[Int, Vector[A]] {
        n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
    }
}

不幸的是,这使得堆叠在一个大集合上。

所以让我们蹦跳吧。 现在是sample

// sample using trampolined State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
    import Free.Trampoline

    type TrampolinedState[S, B] = StateT[Trampoline, S, B]
    type M[B] = TrampolinedState[Int, B]

    // Same caveat about foldLeftM using foldRight and blowing the heap
    // applies here.  Ignore for now. This solution blows the heap anyway;
    // let's fix that issue first.
    col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0 run
}

哪里更新

// update using trampolined State monad
def update(k: Int) = {
    (acc: Vector[A], x: A) => StateT[Trampoline, Int, Vector[A]] {
        n => Trampoline.done { (n + 1, algorithmR(k, n + 1, acc, x) }
    }
}

这解决了堆栈溢出问题,但仍然是堆栈非常庞大(或非常小的堆栈)。 在折叠过程中,每个值的一个匿名函数被创建(我相信要关闭每个x: A参数),在蹦床运行之前消耗堆。 (FWIW,国家版本也有这个问题;堆栈溢出只是先显示较小的集合。)


我们真正的问题是未执行的国家运动所使用的堆。

不它不是。 真正的问题是该集合不适合内存,并且foldLeftMfoldRightM强制整个集合。 不纯解决方案的一个副作用是,你正在释放内存。 在“纯功能”解决方案中,你不会在任何地方做到这一点。

你对Iterable使用忽略了一个关键的细节:实际上是什么样的collection col ,它的元素是如何创建的,以及它们如何被抛弃。 所以,必然的, foldLeftMIterable 。 这可能太严格了,你会迫使整个系列进入记忆。 例如,如果它是一个Stream ,那么只要你持有到col至今被迫将内存中的所有元素。 如果它是一些其他类型的不会记忆它的元素的惰性Iterable ,那么折叠仍然太严格。

我试着用EphemeralStream第一个例子没有看到任何重大的堆压力,尽管它明显具有相同的“未执行的状态”。 区别在于EphemeralStream的元素被弱引用,其foldRight不会强制整个流。

我怀疑如果你使用Foldable.foldr ,那么你就不会看到有问题的行为,因为它会在第二个参数中使用懒惰函数折叠。 当你调用fold时,你希望它立即返回一个看起来像这样的暂停:

Suspend(() => head |+| tail.foldRightM(...))

当蹦床恢复第一次暂停并跑到下一次暂停时,暂停之间的所有分配将变为可供垃圾收集器释放。

尝试以下操作:

def foldM[M[_]:Monad,A,B](a: A, bs: Iterable[B])(f: (A, B) => M[A]): M[A] =
  if (bs.isEmpty) Monad[M].point(a)
  else Monad[M].bind(f(a, bs.head))(fax => foldM(fax, bs.tail)(f))

val MS = StateT.stateTMonadState[Int, Trampoline]
import MS._

foldM[M,R,Int](Monoid[R].zero, col) {
  (x, r) => modify(_ + 1) map (_ => Monoid[R].append(x, r))
} run 0 run

对于一个蹦床Monad M ,这将会在不断的堆中运行,但是会为非蹦床monad溢出堆栈。

真正的问题是, Iterable对于太大而不适合内存的数据并不是一个好的抽象。 当然,你可以编写一个命令式的副作用程序,你可以在每次迭代之后明确地丢弃元素,或者使用一个懒惰的正确折叠。 这很有效,直到你想用另一个编写该程序。 我假设你在一个State单体中进行调查的全部理由是为了获得合成。

所以,你可以做什么? 以下是一些选项:

  • 使用ReducerMonoid及其组合,然后作为最后一步在命令式显式释放循环(或蹦床式懒惰右键折叠)中运行,在此之后组合是不可能的或预期的。
  • 使用Iteratee组合和Iteratee Enumerator来提供它们。
  • 用Scalaz-Stream写入组合流式传感器。
  • 这些选项中的最后一个是我将在一般情况下使用和推荐的选项。


    使用State或任何类似的monad并不是解决问题的好方法。 使用State是谴责大型集合堆栈/堆。 考虑从大集合构建的x: State[A,B]的值(例如通过折叠它)。 然后x可以在初始状态下的不同的值被评估A ,产生不同的结果。 所以x需要保留集合中包含的所有信息。 在纯粹的设置中, x不能忘记一些不会造成堆栈/堆栈的信息,因此计算出的任何内容都会保留在内存中,直到整个monadic值被释放为止,这只有在评估结果后才会发生。 所以x的内存消耗与集合的大小成正比。

    我相信一个适合这个问题的方法是使用功能迭代/管道/导管 。 这个概念(在这三个名字下引用)是为了处理大量的具有固定内存消耗的数据集合而发明的,并且使用简单的组合器来描述这些过程。

    我尝试使用Scalaz的Iteratees ,但是看起来这个部分还不成熟,像State那样受到堆栈溢出的影响(或者我没有正确使用它;代码在这里可用,如果有人感兴趣的话) 。

    然而,使用我的(还有点实验性的)scala导管库很简单( 免责声明:我是作者):

    import conduit._
    import conduit.Pipe._
    
    object Run extends App {
      // Define a sampling function as a sink: It consumes
      // data of type `A` and produces a vector of samples.
      def sampleI[A](k: Int): Sink[A, Vector[A]] =
        sampleI[A](k, 0, Vector())
    
      // Create a sampling sink with a given state. It requests
      // a value from the upstream conduit. If there is one,
      // update the state and continue (the first argument to `requestF`).
      // If not, return the current sample (the second argument).
      // The `Finalizer` part isn't important for our problem.
      private def sampleI[A](k: Int, n: Int, sample: Vector[A]):
                      Sink[A, Vector[A]] =
        requestF((x: A) => sampleI(k, n + 1, algorithmR(k, n + 1, sample, x)),
                 (_: Any) => sample)(Finalizer.empty)
    
    
      // The sampling algorithm copied from the question.
      val rand = new scala.util.Random()
    
      def algorithmR[A](k: Int, n: Int, sample: Vector[A], x: A): Vector[A] = {
        if (sample.size < k) {
          sample :+ x // must keep first k elements
        } else {
          val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
          if (r <= k)
            sample.updated(r - 1, x) // sample is 0-index
          else
            sample
        }
      }
    
      // Construct an iterable of all `short` values, pipe it into our sampling
      // funcition, and run the combined pipe.
      {
        print(runPipe(Util.fromIterable(Short.MinValue to Short.MaxValue) >->
              sampleI(10)))
      }
    }
    

    更新:使用State可以解决问题,但是我们需要为知道如何使用恒定空间的State实现自定义折叠:

    import scala.collection._
    import scala.language.higherKinds
    import scalaz._
    import Scalaz._
    import scalaz.std.iterable._
    
    object Run extends App {
      // Folds in a state monad over a foldable
      def stateFold[F[_],E,S,A](xs: F[E],
                                f: (A, E) => State[S,A],
                                z: A)(implicit F: Foldable[F]): State[S,A] =
        State[S,A]((s: S) => F.foldLeft[E,(S,A)](xs, (s, z))((p, x) => f(p._2, x)(p._1)))
    
    
      // Sample a lazy collection view
      def sampleS[F[_],A](k: Int, xs: F[A])(implicit F: Foldable[F]):
                      State[Int,Vector[A]] =
        stateFold[F,A,Int,Vector[A]](xs, update(k), Vector())
    
      // update using State monad
      def update[A](k: Int) = {
        (acc: Vector[A], x: A) => State[Int, Vector[A]] {
            n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
        }
      }
    
      def algorithmR[A](k: Int, n: Int, sample: Vector[A], x: A): Vector[A] = ...
    
      {
        print(sampleS(10, (Short.MinValue to Short.MaxValue)).eval(0))
      }
    }
    
    链接地址: http://www.djcxy.com/p/43199.html

    上一篇: Monadic fold with State monad in constant space (heap and stack)?

    下一篇: Is Haskell truly pure (is any language that deals with input and output outside the system)?