smallpond浅探

Deepseek AI开源的数据处理框架
May 01, 2025 · 13 min read

Smallpond 是DeepSeek开源的一个基于DuckDB的轻量级数据处理框架。在Andy Pavlo教授的年度回顾中 Databases in 2024: A Year in Review, 他就提到在OLAP领域中,其实绝大多数的query处理的数据量都很小,中位数在100MB(而目前AWS上EC2提供的最大内存的实例为24TB),这意味着使用单实例的 DuckDB已经足够。更不必说使用DuckDB同时意味着原生就有了列式存储和向量执行的支持,性能优越。如今很多数据库都在开始集成DuckDB作为查询引擎。

Smallpond将DuckDB引入了数据处理领域,和Spark来到了同一战场。这里面就有很多值得一探的地方了。

Smallpond的编程API

Smallpond有一套自己的DataFrame定义,针对DataFrame有map, filter, flat_map, repartition等等操作,所有操作都是惰性执行的,和Spark高度类似。 每个DataFrame都包含一个logical plan,map/filter这些操作会生成一个新的DataFrame,持有一个变更过的logical plan。

每一个plan都是一个Node,同时Node有自己的input_deps,可以追溯到直接依赖的上游Node。Node为点,input_deps为边,就共同构成一个DAG。 举例来说,像df2=df1.map(col=1)这样一段代码,df2的plan就是SqlEngineNode,并且这个SqlEngineNode通过input_deps参数,指向df1的Node。 df1可以是一个单独的读取文件系统的Node,也可以是已经经过多次操作嵌套的plan。

Node具体的类型,和Spark大差不差。有类似DataSourceNode, ShuffleNode, HashPartitionNode, SqlEngineNode, DataSinkNode等等。 所有的Node都持有同一session的Context,Context里面有node_id的计数,会自动自增。

注意到像map, filter这些操作,在Spark中就是一个Project或者Filter算子,但是在Smallpond中,都是SqlEngineNode。SqlEngineNode并行地在每个 partition上通过调用DuckDB来执行。map对应的sql就是select {expr} from {df}, filter对应select * from {df} where ({expr})

Smallpond支持SQL API吗?

Spark的SQL API和DataFrame API,都可以解析成相同的Logical Plan。就SQL而言,一个复杂的SQL是会拆成多个任务来执行,中间可能涉及到shuffle。 这也是从MapReduce, Hive一路继承而来的。

而Smallpond是不会自动完成任务的拆分的。比如Readme里Quick Start的例子:

import smallpond

# Initialize session
sp = smallpond.init()

# Load data
df = sp.read_parquet("prices.parquet")

# Process data
df = df.repartition(3, hash_by="ticker")
df = sp.partial_sql("SELECT ticker, min(price), max(price) FROM {0} GROUP BY ticker", df)

# Save results
df.write_parquet("output/")
# Show results
print(df.to_pandas())

要执行的sql是:

SELECT ticker, min(price), max(price) FROM {0} GROUP BY ticker

就Spark而言,我们知道这里会用ticker作为key,对整个数据集进行shuffle,按spark.sql.shuffle.partitions参数进行自动分为若干个分区, 再并行地对每个分区上的结果执行计算。

而Smallpond需要用户显式地根据ticker来做repartition。这也是为啥它的操作叫做partial_sql. 如果这里用户指定的hash_by字段不对, 或者使用的是其它的分区策略,那么出来的计算结果就是错的。

partial_sql的函数注释里也明确了,如果要对多个df做join,用户需要自己保证两个df的是按照join key具有相同的分区的。

这也是Smallpond目前和Spark最大功能差距。暂时Smallpond还是一个轻量级数据处理框架,并不能完全覆盖整个数据处理的场景。 编写代码的用户需要对底层分布式执行的细节有更大程度的理解。

Smallpond的Optimizer做了什么

前面提到,DataFrame是惰性计算的,针对DataFrame的每个操作,都在现有plan的基础上,继续添加节点。

而最终DataFrame在执行之前,会走Optimizer生成optimized_plan

self.optimized_plan = Optimizer(exclude_nodes=set(self.session._node_to_tasks.keys())).visit(self.plan)

Optimizer目前只实现了 visit_query_engine_node,也就是针对DuckDB的算子。

def visit_query_engine_node(self, node: SqlEngineNode, depth: int) -> Node:
    # fuse consecutive SqlEngineNodes
    if len(node.input_deps) == 1 and isinstance(
        child := self.visit(node.input_deps[0], depth + 1), SqlEngineNode
    ):
        fused = copy.copy(node)
        fused.input_deps = child.input_deps
        fused.udfs = node.udfs + child.udfs
        fused.cpu_limit = max(node.cpu_limit, child.cpu_limit)
        fused.gpu_limit = max(node.gpu_limit, child.gpu_limit)
        fused.memory_limit = (
            max(node.memory_limit, child.memory_limit)
            if node.memory_limit is not None and child.memory_limit is not None
            else node.memory_limit or child.memory_limit
        )
        # merge the sql queries
        # example:
        # ```
        # child.sql_queries = ["select * from {0}"]
        #  node.sql_queries = ["select a, b from {0}"]
        # fused.sql_queries = ["select a, b from (select * from {0})"]
        # ```
        fused.sql_queries = child.sql_queries[:-1] + [
            query.format(f"({child.sql_queries[-1]})") for query in node.sql_queries
        ]
        return fused
    return self.generic_visit(node, depth)

这里具体的优化逻辑是,如果连续两个算子都是sql算子,那么通过子查询的形式,将sql合并,两个算子变成一个算子,少调用一次DuckDB,算是一个非常直接易懂的优化规则。

Planner: Smallpond和Ray的集成

生成optimized plan之后,planner进一步生成执行计划

# create tasks for the optimized plan
planner = Planner(self.session._runtime_ctx)
# let planner update self.session._node_to_tasks
planner.node_to_tasks = self.session._node_to_tasks
return planner.visit(self.optimized_plan)

具体来说,Planner把Optimized Plan里的Node,转换为Task。Task基本就是针对Ray Core里的Low Level API: Ray Task的抽象。

task.run_on_ray(), 会创建在ray上的执行任务, task在ray上执行完成后,会返回一个DataSet。

@ray.remote
def exec_task(task: Task, *inputs: DataSet) -> DataSet:
    import multiprocessing as mp
    import os
    from pathlib import Path

    from loguru import logger

    # ray use a process pool to execute tasks
    # we set the current process name to the task name
    # so that we can see task name in the logs
    mp.current_process().name = task.key

    # probe the retry count
    task.retry_count = 0
    while os.path.exists(task.ray_marker_path):
        task.retry_count += 1
        if task.retry_count > DEFAULT_MAX_RETRY_COUNT:
            raise RuntimeError(f"task {task.key} failed after {task.retry_count} retries")
    if task.retry_count > 0:
        logger.warning(f"task {task.key} is being retried for the {task.retry_count}th time")
    # create the marker file
    Path(task.ray_marker_path).touch()

    # put the inputs into the task
    assert len(inputs) == len(task.input_deps)
    task.input_datasets = list(inputs)
    # execute the task
    status = task.exec()
    if status != WorkStatus.SUCCEED:
        raise task.exception or RuntimeError(f"task {task.key} failed with status {status}")

    # dump the output dataset atomically
    os.makedirs(os.path.dirname(task.ray_dataset_path), exist_ok=True)
    dump(task.output, task.ray_dataset_path, atomic_write=True)
    return task.output

DataFrame API -> Logical Plan -> Optimized Plan -> Execution Task,算是一个非常典型的数据处理框架的架构。而针对DuckDB的使用, 是非常具有启发性的。

Abstraction is good, but don't forget reality.

Copyright © reata.github.io 2025, powered by GastbyJS & Material UI