Use Preprocess in your custom stack.
Integrating Preprocess into your custom ingestion pipeline can streamline the preparation of documents for downstream natural language processing (NLP) tasks. Whether you’re building a pipeline that ingests files into a vector database, indexing them with TF-IDF, or preparing them for fine-tuning a language model, Preprocess helps transform raw documents into semantically meaningful chunks.
This guide shows how you can integrate Preprocess—both via the API and the Python SDK—into a data pipeline. It covers how to:
- Connect Preprocess to your data source
- Trigger the chunking process
- Retrieve and utilize the resulting chunks
When and How to Start the Ingestion
You can integrate Preprocess at different stages of your pipeline. For example:
- From an External Knowledge Base: If you have a knowledge base or CMS that holds documents, you can periodically fetch new or updated documents and pass them to Preprocess for chunking.
- From a Dashboard with Drag & Drop: If your application’s interface allows users to upload files directly, trigger the Preprocess ingestion as soon as the user drops a file. Preprocess will handle the file asynchronously, and once ready, you can use the results immediately in downstream tasks.
Using the Python SDK
The Python SDK simplifies the chunking process. You can directly feed a file and wait for the results:
from pypreprocess import Preprocess
# Initialize with your API key and a local file path
preprocess = Preprocess(api_key="YOUR_API_KEY", filepath="path/to/your/document.pdf")
# Start the chunking process
preprocess.chunk()
# Wait for the process to complete
result = preprocess.wait()
# Retrieve the chunks
chunks = result.data["chunks"]
At this point, chunks
is a list of text segments that respect the semantic structure of the original document. These chunks are ideal for:
- Inserting into a vector database (e.g., Pinecone, Qdrant, Weaviate, Chroma)
- Indexing with TF-IDF or a BM25-based search engine
- Feeding into a fine-tuning pipeline for a language model
Example: Insert chunks into a vector database (Pinecone)
import pinecone
from sentence_transformers import SentenceTransformer
# Initialize Pinecone
pinecone.init(api_key="PINECONE_API_KEY", environment="YOUR_ENVIRONMENT")
index = pinecone.Index("my-vector-index")
# Initialize an embedding model
model = SentenceTransformer('all-MiniLM-L6-v2')
# Convert chunks to embeddings
embeddings = model.encode(chunks).tolist()
# Upsert chunks with their embeddings into Pinecone
vectors = [(f"chunk-{i}", embedding) for i, embedding in enumerate(embeddings)]
index.upsert(vectors=vectors)
Example: Index with TF-IDF
from sklearn.feature_extraction.text import TfidfVectorizer
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(chunks)
# You can now store `tfidf_matrix` and `vectorizer.vocabulary_`
# for fast lexical searches against your chunks.
Example: Prepare for Fine-Tuning
If you’re fine-tuning an LLM, you might need the chunks as training examples. For instance, you can save them as a JSONL file where each line is a training sample:
import json
with open("training_data.jsonl", "w") as f:
for chunk in chunks:
# Structure the chunk data as needed for your fine-tuning framework
record = {"prompt": "Context:", "completion": chunk}
f.write(json.dumps(record) + "\n")
Using the API Directly (Without the SDK)
If you prefer not to use the SDK, you can integrate Preprocess directly via HTTP requests. For example, to start processing a document, send a POST
request with multipart/form-data
:
import requests
url = "https://chunk.ing/?webhook=https://your_webhook.url"
headers = {"x-api-key": "YOUR_API_KEY"}
files = {"file": open("path/to/your/document.pdf", "rb")}
response = requests.post(url, headers=headers, files=files)
data = response.json()
if data.get("success"):
process_id = data["data"]["process"]["id"]
print("Process started with ID:", process_id)
else:
print("Error:", data["message"])
Once the chunking completes, Preprocess will call your provided webhook with the chunks. If you cannot set up a webhook, you can poll the result endpoint:
import time
result_url = f"https://chunk.ing/get_result?id={process_id}"
while True:
res = requests.post(result_url, headers={"x-api-key": "YOUR_API_KEY"})
result_data = res.json()
if result_data.get("success"):
# Chunking finished successfully
chunks = result_data["data"]["chunks"]
# Integrate chunks into your pipeline
break
else:
# Not finished yet, wait a bit
time.sleep(5)
You can then use the retrieved chunks
in the same manner as with the SDK examples—embedding them, indexing them, or preparing them for fine-tuning tasks.
Best Practices
- Batch Processing: If you have multiple documents, run them in parallel or in batches. Store their
process_id
s and retrieve their results later. - Logging & Monitoring: Store the
process_id
and logs so you can track document ingestion states and handle any failures gracefully. - Security & Access Control: Ensure that only authorized users or systems can trigger the ingestion process. Use your API key securely and rotate it periodically as per security best practices.