A Coding Guide to Build a Scalable End-to-End Machine Learning Data Pipeline Using Daft for High-Performance Structured and Image Data Processing
In this tutorial, we explore how we use Daft as a high-performance, Python-native data engine to build an end-to-end analytical pipeline. We start by loading a real-world MNIST dataset, then progressively transform it using UDFs, feature engineering, aggregations, joins, and lazy execution. Also, we demonstrate how to seamlessly combine structured data processing, numerical computation, and machine learning. By the end, we are not just manipulating data, we are building a complete model-ready pipeline powered by Daft’s scalable execution engine.
!pip -q install daft pyarrow pandas numpy scikit-learn
import os
os.environ["DO_NOT_TRACK"] = "true"
import numpy as np
import pandas as pd
import daft
from daft import col
print("Daft version:", getattr(daft, "__version__", "unknown"))
URL = "https://github.com/Eventual-Inc/mnist-json/raw/master/mnist_handwritten_test.json.gz"
df = daft.read_json(URL)
print("\nSchema (sampled):")
print(df.schema())
print("\nPeek:")
df.show(5)
We install Daft and its supporting libraries directly in Google Colab to ensure a clean, reproducible environment. We configure optional settings and verify the installed version to confirm everything is working correctly. By doing this, we establish a stable foundation for building our end-to-end data pipeline.
def to_28x28(pixels):
arr = np.array(pixels, dtype=np.float32)
if arr.size != 784:
return None
return arr.reshape(28, 28)
df2 = (
df
.with_column(
"img_28x28",
col("image").apply(to_28x28, return_dtype=daft.DataType.python())
)
.with_column(
"pixel_mean",
col("img_28x28").apply(lambda x: float(np.mean(x)) if x is not None else None,
return_dtype=daft.DataType.float32())
)
.with_column(
"pixel_std",
col("img_28x28").apply(lambda x: float(np.std(x)) if x is not None else None,
return_dtype=daft.DataType.float32())
)
)
print("\nAfter reshaping + simple features:")
df2.select("label", "pixel_mean", "pixel_std").show(5)
We load a real-world MNIST JSON dataset directly from a remote URL using Daft’s native reader. We inspect the schema and preview the data to understand its structure and column types. It allows us to validate the dataset before applying transformations and feature engineering.
@daft.udf(return_dtype=daft.DataType.list(daft.DataType.float32()), batch_size=512)
def featurize(images_28x28):
out = []
for img in images_28x28.to_pylist():
if img is None:
out.append(None)
continue
img = np.asarray(img, dtype=np.float32)
row_sums = img.sum(axis=1) / 255.0
col_sums = img.sum(axis=0) / 255.0
total = img.sum() + 1e-6
ys, xs = np.indices(img.shape)
cy = float((ys * img).sum() / total) / 28.0
cx = float((xs * img).sum() / total) / 28.0
vec = np.concatenate([row_sums, col_sums, np.array([cy, cx, img.mean()/255.0, img.std()/255.0], dtype=np.float32)])
out.append(vec.astype(np.float32).tolist())
return out
df3 = df2.with_column("features", featurize(col("img_28x28")))
print("\nFeature column created (list[float]):")
df3.select("label", "features").show(2)
We reshape the raw pixel arrays into structured 28×28 images using a row-wise UDF. We compute statistical features, such as the mean and standard deviation, to enrich the dataset. By applying these transformations, we convert raw image data into structured and model-friendly representations.
label_stats = (
df3.groupby("label")
.agg(
col("label").count().alias("n"),
col("pixel_mean").mean().alias("mean_pixel_mean"),
col("pixel_std").mean().alias("mean_pixel_std"),
)
.sort("label")
)
print("\nLabel distribution + summary stats:")
label_stats.show(10)
df4 = df3.join(label_stats, on="label", how="left")
print("\nJoined label stats back onto each row:")
df4.select("label", "n", "mean_pixel_mean", "mean_pixel_std").show(5)
We implement a batch UDF to extract richer feature vectors from the reshaped images. We perform group-by aggregations and join summary statistics back to the dataset for contextual enrichment. This demonstrates how we combine scalable computation with advanced analytics within Daft.
small = df4.select("label", "features").collect().to_pandas()
small = small.dropna(subset=["label", "features"]).reset_index(drop=True)
X = np.vstack(small["features"].apply(np.array).values).astype(np.float32)
y = small["label"].astype(int).values
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
clf = LogisticRegression(max_iter=1000, n_jobs=None)
clf.fit(X_train, y_train)
pred = clf.predict(X_test)
acc = accuracy_score(y_test, pred)
print("\nBaseline accuracy (feature-engineered LogisticRegression):", round(acc, 4))
print("\nClassification report:")
print(classification_report(y_test, pred, digits=4))
out_df = df4.select("label", "features", "pixel_mean", "pixel_std", "n")
out_path = "/content/daft_mnist_features.parquet"
out_df.write_parquet(out_path)
print("\nWrote parquet to:", out_path)
df_back = daft.read_parquet(out_path)
print("\nRead-back check:")
df_back.show(3)
We materialize selected columns into pandas and train a baseline Logistic Regression model. We evaluate performance to validate the usefulness of our engineered features. Also, we persist the processed dataset to Parquet format, completing our end-to-end pipeline from raw data ingestion to production-ready storage.
In this tutorial, we built a production-style data workflow using Daft, moving from raw JSON ingestion to feature engineering, aggregation, model training, and Parquet persistence. We demonstrated how to integrate advanced UDF logic, perform efficient groupby and join operations, and materialize results for downstream machine learning, all within a clean, scalable framework. Through this process, we saw how Daft enables us to handle complex transformations while remaining Pythonic and efficient. We finished with a reusable, end-to-end pipeline that showcases how we can combine modern data engineering and machine learning workflows in a unified environment.
Check out the Full Codes here. Also, feel free to follow us on Twitter and don’t forget to join our 120k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.



