Big Data & Streaming
ThinkLang provides tools for processing large collections through AI: batch processing, map/reduce, lazy dataset pipelines, text chunking, and streaming. All available from thinklang or thinklang/data.
batch()
The most flexible option -- provide any async processor:
import { batch, think, zodSchema } from "thinklang";
import { z } from "zod";
const Sentiment = z.object({
label: z.enum(["positive", "negative", "neutral"]),
score: z.number(),
});
const result = await batch({
items: reviews,
processor: async (review) => {
return think({ prompt: `Classify: "${review}"`, ...zodSchema(Sentiment) });
},
maxConcurrency: 5,
costBudget: 1.00,
onError: "continue",
onProgress: (p) => console.log(`${p.completed}/${p.total} done`),
});
console.log(result.results); // successful items
console.log(result.errors); // failed items
console.log(result.totalCostUsd); // total costbatch() Options
| Option | Type | Default | Description |
|---|---|---|---|
items | T[] | required | Items to process |
processor | (item: T, index: number) => Promise<U> | required | Async processor function |
maxConcurrency | number | 5 | Max parallel operations |
costBudget | number | -- | USD threshold, stops when exceeded |
onError | "fail-fast" | "continue" | "continue" | Error handling strategy |
onProgress | (progress) => void | -- | Progress callback |
onItemComplete | (item, result) => void | -- | Per-item completion callback |
abortSignal | AbortSignal | -- | Cancellation signal |
rateLimit | number | -- | Min ms between item starts |
BatchResult
| Field | Type | Description |
|---|---|---|
results | U[] | Successfully processed items |
errors | BatchError[] | Failed items with error details |
totalItems | number | Total items in the input |
successCount | number | Number of successful items |
errorCount | number | Number of failed items |
totalCostUsd | number | Total cost in USD |
totalDurationMs | number | Total elapsed time in ms |
mapThink()
Simpler API when you just need to apply think() to each item:
import { mapThink, zodSchema } from "thinklang";
import { z } from "zod";
const Sentiment = z.object({
label: z.enum(["positive", "negative", "neutral"]),
score: z.number(),
});
const result = await mapThink({
items: reviews,
promptTemplate: (review) => `Classify sentiment: "${review}"`,
...zodSchema(Sentiment),
maxConcurrency: 5,
});
console.log(result.results);
console.log(result.successCount);
console.log(result.errorCount);reduceThink()
Aggregate items using tree-reduction. Items are batched, each batch is summarized by the LLM, then summaries are recursively reduced:
import { reduceThink } from "thinklang";
const summary = await reduceThink({
items: paragraphs,
prompt: "Combine these into a coherent summary",
jsonSchema: { type: "string" },
batchSize: 5,
maxConcurrency: 3,
});Dataset
Lazy, chainable collection for building data pipelines. Pipelines are built lazily and only run on .execute().
import { Dataset, think, zodSchema } from "thinklang";
import { z } from "zod";
const Sentiment = z.object({
label: z.enum(["positive", "negative", "neutral"]),
score: z.number(),
});
const results = await Dataset.from(reviews)
.map(async (review) => think({ prompt: `Classify: "${review}"`, ...zodSchema(Sentiment) }))
.filter(async (sentiment) => sentiment.label === "positive")
.execute({ maxConcurrency: 3, costBudget: 2.00 });
console.log(results.toArray());
console.log(results.length);
console.log(results.first());Methods
| Method | Description |
|---|---|
Dataset.from(items) | Create a dataset from an array |
Dataset.range(start, end) | Create a dataset from a number range |
.map(fn) | Transform each item |
.filter(fn) | Keep items where fn returns true |
.flatMap(fn) | Transform and flatten |
.batch(size) | Group items into chunks |
.reduce(fn, initial) | Reduce to a single value |
.execute(options) | Run the pipeline |
Execute Options
| Option | Type | Default | Description |
|---|---|---|---|
maxConcurrency | number | 5 | Max parallel operations |
costBudget | number | -- | USD threshold |
onError | "fail-fast" | "continue" | "continue" | Error handling strategy |
rateLimit | number | -- | Min ms between item starts |
abortSignal | AbortSignal | -- | Cancellation signal |
Text Chunking
Split large text to fit within LLM context windows:
import { chunkText, estimateTokens } from "thinklang";
const { chunks, totalChunks } = chunkText(longArticle, {
maxTokens: 1000,
strategy: "paragraph", // "paragraph" | "sentence" | "fixed"
overlap: 50,
});
console.log(`Split into ${totalChunks} chunks`);
console.log(`First chunk tokens: ~${estimateTokens(chunks[0])}`);Array Chunking
import { chunkArray } from "thinklang";
const { chunks } = chunkArray(items, { chunkSize: 10 });Streaming
Process data incrementally using async generators:
import { streamThink, collectStream } from "thinklang";
for await (const event of streamThink({
prompt: longText,
jsonSchema: { type: "string" },
chunkOptions: { maxTokens: 500, strategy: "paragraph" },
})) {
console.log(`Chunk ${event.index + 1}/${event.totalChunks}: ${event.data}`);
}
// Or collect all at once
const allResults = await collectStream(streamThink({ prompt: text, jsonSchema: schema }));Vector Store
ThinkLang includes an in-memory vector store for RAG-style retrieval. The SimpleEmbedding provider uses hash-based embeddings that require no API calls — perfect for demos and testing.
import { InMemoryVectorStore, indexText } from "thinklang";
const store = new InMemoryVectorStore();
// Add documents
await store.add([
{ id: "doc-1", text: "TypeScript is a statically typed superset of JavaScript.", metadata: { source: "docs" } },
{ id: "doc-2", text: "Python is widely used in data science and machine learning.", metadata: { source: "docs" } },
{ id: "doc-3", text: "Rust provides memory safety without garbage collection.", metadata: { source: "docs" } },
]);
// Query for similar documents
const results = await store.query("type safety and static typing", { topK: 2, minScore: 0.1 });
for (const { item, score } of results) {
console.log(`[${score.toFixed(3)}] ${item.text}`);
}Indexing Text
Use indexText() to automatically chunk and index longer text:
import { InMemoryVectorStore, indexText } from "thinklang";
const store = new InMemoryVectorStore();
const chunkCount = await indexText(article, store, {
strategy: "paragraph",
idPrefix: "article",
metadata: { source: "article.md" },
});
console.log(`Indexed ${chunkCount} chunks`);
const results = await store.query("testing and assertions", { topK: 1 });Query Options
| Option | Type | Default | Description |
|---|---|---|---|
topK | number | 5 | Maximum number of results to return |
minScore | number | 0 | Minimum similarity score threshold |
filter | (item) => boolean | — | Filter function applied before ranking |
Custom Embedding Provider
Implement the EmbeddingProvider interface to use real embeddings (e.g., OpenAI embeddings API):
import { InMemoryVectorStore, type EmbeddingProvider } from "thinklang";
class OpenAIEmbedding implements EmbeddingProvider {
dimensions = 1536;
async embed(texts: string[]): Promise<number[][]> {
// Call OpenAI embeddings API
const response = await openai.embeddings.create({ model: "text-embedding-3-small", input: texts });
return response.data.map(d => d.embedding);
}
}
const store = new InMemoryVectorStore(new OpenAIEmbedding());Import Paths
// From main package
import { batch, mapThink, reduceThink, Dataset, chunkText, streamThink } from "thinklang";
// Or from dedicated entry point
import { batch, mapThink, reduceThink, Dataset, chunkText, streamThink } from "thinklang/data";Next Steps
- Core Functions for think, infer, reason
- Agents & Tools for agentic workflows
- Big Data (Language Guide) for the .tl syntax equivalent
- Runtime API Reference for complete type definitions