API Reference

Learn how to use the Preprocess API to upload files, process them asynchronously, and retrieve structured chunks for downstream tasks.

This guide walks you through the core steps needed to parse and chunk a document using the Preprocess service. You can use the Preprocess API directly or leverage the Python SDK to handle uploads, chunking, and retrieving results. We’ll also discuss patterns for managing the inherently asynchronous nature of this process—both via polling and webhooks—and why, in most cases, the asynchronous approach provides a better experience.

Understanding the Process

High-Level Flow:

  1. Send a file to the Preprocess API:
    You upload a file (PDF, DOCX, HTML, text, etc.) via the API or Python SDK.

  2. Asynchronous Chunking:
    The chunking process happens asynchronously. You receive a process_id immediately, while the service continues working behind the scenes.

  3. Retrieving Chunks:
    Once completed, the service returns a list of semantically meaningful text chunks. These can then power semantic search, retrieval-augmented generation, or fine-tuning tasks.

Why Asynchronous?

Scalability & Efficiency:
By nature, chunking can be time-consuming. With synchronous approaches, your client or server would sit idle, blocking and waiting for a response—this ties up resources and reduces scalability. An asynchronous approach frees you to handle other tasks or requests while the chunking occurs in the background.

Seamless Automation:
When you integrate asynchronous chunking with a webhook, the Preprocess service notifies you as soon as the chunks are ready. You don’t need to continuously poll or guess when to check for results. This pattern significantly simplifies automation, allowing your applications to seamlessly process large batches of documents without manual intervention.

Recommended for Production:
While polling may feel straightforward, it’s not always optimal in larger workflows, especially in production environments. Asynchronously receiving results via webhooks promotes a decoupled architecture, reduces overhead, and ensures that your application remains responsive to other user requests.

In short, adopting asynchronous strategies is a best practice that saves time, reduces complexity, and positions your system to handle growth and varying load.

Basic Example Using the Python SDK

Synchronous-Style (Blocking Polling)

If you just want a quick test, you can use a blocking polling approach. However, this approach is often limited to simple experiments or small-scale scenarios due to its resource costs.

from pypreprocess import Preprocess

# Initialize with your API key and the local file path
preprocess = Preprocess(api_key="YOUR_API_KEY", filepath="path/to/your_document.pdf")

# Start the chunking process
preprocess.chunk()

# Wait until the process finishes (this method internally polls)
preprocess.wait()

# Retrieve the result
result = preprocess.result()
chunks = result.data["chunks"]
print(chunks)

This code will block execution until the document is fully chunked. For large documents or multiple concurrent requests, this is not ideal.

Asynchronous Polling (Non-Blocking, External Script)

For a more production-friendly approach—albeit still involving polling—you can initiate chunking and then separately check the status at intervals. This removes some blocking but still requires you to poll frequently.

Script 1 (Initiate chunking):

# initiate_chunking.py
from pypreprocess import Preprocess

preprocess = Preprocess(api_key="YOUR_API_KEY", filepath="path/to/your_document.pdf")
preprocess.chunk()
process_id = preprocess.get_process_id()
print(f"Process initiated with ID: {process_id}")

Script 2 (Poll every 5 seconds):

# poll_for_result.py
import time
from pypreprocess import Preprocess

api_key = "YOUR_API_KEY"
process_id = "YOUR_PROCESS_ID_FROM_SCRIPT_1"

preprocess = Preprocess(api_key=api_key, process_id=process_id)

while True:
    result = preprocess.result()
    if result.success and result.data.get("chunks"):
        print("Chunks have been retrieved successfully!")
        chunks = result.data["chunks"]
        print(chunks)
        break
    else:
        print("Processing not finished yet, waiting 5 more seconds...")
        time.sleep(5)

This approach is better than pure blocking, but you still need infrastructure to run a loop and check for results.

Asynchronous (Webhook) - The Recommended Approach

For truly scalable and maintainable architectures, consider setting a webhook. With this approach, your application sends the file for processing and immediately continues running other tasks. When the chunking finishes, Preprocess will POST the results directly to your endpoint. There’s no polling loop to maintain, and you can easily handle multiple concurrent requests without tying up resources.

Client-side (to start the process):

from pypreprocess import Preprocess

# Using a webhook for an asynchronous callback
preprocess = Preprocess(api_key="YOUR_API_KEY", filepath="path/to/your_document.pdf")

# Specify a webhook URL that will receive the final result
preprocess.set_options({"webhook": "https://your-service.com/webhook-endpoint"})

# Start the chunking process
preprocess.chunk()

# Once the process completes, Preprocess will POST the result to your webhook URL.
# No waiting or manual checking necessary.

Server-side (your webhook endpoint):

The following example uses Flask, you can implement the following with your favorite stack.

from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/webhook-endpoint', methods=['POST'])
def webhook():
    data = request.json
    if data.get("success") is True:
        # Extract chunks
        chunks = data["data"]["chunks"]
        print("Received chunks:", chunks)
    else:
        print("Error or incomplete data received.")

    return jsonify({"status": "received"}), 200

if __name__ == '__main__':
    app.run(port=5000)

This method is efficient and strongly recommended, especially when dealing with large volumes or integrating into production systems. Your webhook endpoint can parse the POST data, save the chunks to a database, and trigger further processing—fully automating the flow.

Integrating with Tools and Frameworks

Once you have the chunks, integration with libraries like LangChain or LlamaIndex is straightforward. The asynchronous pattern ensures that these subsequent steps only occur once the data is ready, enabling smooth, end-to-end pipelines.

LangChain:

from langchain.document_loaders import JSONLoader

loader = JSONLoader(file_path="result.json", jq_schema='.data.chunks[]')
documents = loader.load()

LlamaIndex:

import json
from llama_index import VectorStoreIndex, Document

with open('result.json', 'r') as f:
    result = json.load(f)

index = VectorStoreIndex([])
for chunk in result['data']['chunks']:
    index.insert(Document(text=chunk))
query_engine = index.as_query_engine()