
今天看的是pyspark里lazy evaluation的处理,python和scala不同不是函数式的。那这是怎么办到的呢?


class RDD(object):
<span class="string">&#34;&#34;</span>&#34;
A Resilient Distributed Dataset (RDD), the basic abstraction <span class="keyword">in</span> Spark.
Represents <span class="keyword">an</span> immutable, partitioned collection of elements that can be
operated <span class="keyword">on</span> <span class="keyword">in</span> parallel.


def (self, f, preservesPartitioning=False):
    Return a new RDD by applying a function to each element of this RDD.

    >>> rdd = sc.parallelize(["b", "a", "c"])
    >>> sorted(rdd.map(lambda x: (x, 1)).collect())
    [('a', 1), ('b', 1), ('c', 1)]
    def func(_, iterator):
        return imap(f, iterator)
    return self.mapPartitionsWithIndex(func, preservesPartitioning)

def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
    Return a new RDD by applying a function to each partition of this RDD,
    while tracking the index of the original partition.

    >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
    >>> def f(splitIndex, iterator): yield splitIndex
    >>> rdd.mapPartitionsWithIndex(f).sum()
    return PipelinedRDD(self, f, preservesPartitioning)

对于map filter这类函数来说,他们每次操作都是产生一个叫做PipelinedRDD的对象,那这个PipelinedRDD又是干什么的呢?

class PipelinedRDD(RDD):

    Pipelined maps:

    >>> rdd = sc.parallelize([1, 2, 3, 4])
    >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
    [4, 8, 12, 16]
    >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
    [4, 8, 12, 16]

    Pipelined reduces:
    >>> from operator import add
    >>> rdd.map(lambda x: 2 * x).reduce(add)
    >>> rdd.flatMap(lambda x: [x, x]).reduce(add)

    def __init__(self, prev, func, preservesPartitioning=False):
        if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable():
            self.func = func
            self.preservesPartitioning = preservesPartitioning
            self._prev_jrdd = prev._jrdd
            self._prev_jrdd_deserializer = prev._jrdd_deserializer
            prev_func = prev.func

            def pipeline_func(split, iterator):
                return func(split, prev_func(split, iterator))
            self.func = pipeline_func
            self.preservesPartitioning = 
                prev.preservesPartitioning and preservesPartitioning
            self._prev_jrdd = prev._prev_jrdd  # maintain the pipeline
            self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
        self.is_cached = False
        self.is_checkpointed = False
        self.ctx = prev.ctx
        self.prev = prev
        self._jrdd_val = None
        self._id = None
        self._jrdd_deserializer = self.ctx.serializer
        self._bypass_serializer = False
        self.partitioner = prev.partitioner if self.preservesPartitioning else None
        self._broadcast = None


def mean(self):
    Compute the mean of this RDD's elements.

    >>> sc.parallelize([1, 2, 3]).mean()
    return self.stats().mean()

def stats(self):
    Return a L{StatCounter} object that captures the mean, variance
    and count of the RDD's elements in one operation.
    def redFunc(left_counter, right_counter):
        return left_counter.mergeStats(right_counter)

    return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)


def _jrdd(self):
    if self._jrdd_val:
        return self._jrdd_val
    if self._bypass_serializer:
        self._jrdd_deserializer = NoOpSerializer()

    if self.ctx.profiler_collector:
        profiler = self.ctx.profiler_collector.new_profiler(self.ctx)
        profiler = None

    command = (self.func, profiler, self._prev_jrdd_deserializer,
    pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self)
    python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
                                         env, includes, self.preservesPartitioning,
                                         bvars, self.ctx._javaAccumulator)
    self._jrdd_val = python_rdd.asJavaRDD()

    if profiler:
        self._id = self._jrdd_val.id()
        self.ctx.profiler_collector.add_profiler(self._id, profiler)
    return self._jrdd_val