Skip to content

Big Data & Streaming

ThinkLang provides tools for processing large collections through AI: batch processing, map/reduce, lazy dataset pipelines, text chunking, and streaming. All available from thinklang or thinklang/data.

batch()

The most flexible option -- provide any async processor:

typescript
import { batch, think, zodSchema } from "thinklang";
import { z } from "zod";

const Sentiment = z.object({
  label: z.enum(["positive", "negative", "neutral"]),
  score: z.number(),
});

const result = await batch({
  items: reviews,
  processor: async (review) => {
    return think({ prompt: `Classify: "${review}"`, ...zodSchema(Sentiment) });
  },
  maxConcurrency: 5,
  costBudget: 1.00,
  onError: "continue",
  onProgress: (p) => console.log(`${p.completed}/${p.total} done`),
});

console.log(result.results);      // successful items
console.log(result.errors);       // failed items
console.log(result.totalCostUsd); // total cost

batch() Options

OptionTypeDefaultDescription
itemsT[]requiredItems to process
processor(item: T, index: number) => Promise<U>requiredAsync processor function
maxConcurrencynumber5Max parallel operations
costBudgetnumber--USD threshold, stops when exceeded
onError"fail-fast" | "continue""continue"Error handling strategy
onProgress(progress) => void--Progress callback
onItemComplete(item, result) => void--Per-item completion callback
abortSignalAbortSignal--Cancellation signal
rateLimitnumber--Min ms between item starts

BatchResult

FieldTypeDescription
resultsU[]Successfully processed items
errorsBatchError[]Failed items with error details
totalItemsnumberTotal items in the input
successCountnumberNumber of successful items
errorCountnumberNumber of failed items
totalCostUsdnumberTotal cost in USD
totalDurationMsnumberTotal elapsed time in ms

mapThink()

Simpler API when you just need to apply think() to each item:

typescript
import { mapThink, zodSchema } from "thinklang";
import { z } from "zod";

const Sentiment = z.object({
  label: z.enum(["positive", "negative", "neutral"]),
  score: z.number(),
});

const result = await mapThink({
  items: reviews,
  promptTemplate: (review) => `Classify sentiment: "${review}"`,
  ...zodSchema(Sentiment),
  maxConcurrency: 5,
});

console.log(result.results);
console.log(result.successCount);
console.log(result.errorCount);

reduceThink()

Aggregate items using tree-reduction. Items are batched, each batch is summarized by the LLM, then summaries are recursively reduced:

typescript
import { reduceThink } from "thinklang";

const summary = await reduceThink({
  items: paragraphs,
  prompt: "Combine these into a coherent summary",
  jsonSchema: { type: "string" },
  batchSize: 5,
  maxConcurrency: 3,
});

Dataset

Lazy, chainable collection for building data pipelines. Pipelines are built lazily and only run on .execute().

typescript
import { Dataset, think, zodSchema } from "thinklang";
import { z } from "zod";

const Sentiment = z.object({
  label: z.enum(["positive", "negative", "neutral"]),
  score: z.number(),
});

const results = await Dataset.from(reviews)
  .map(async (review) => think({ prompt: `Classify: "${review}"`, ...zodSchema(Sentiment) }))
  .filter(async (sentiment) => sentiment.label === "positive")
  .execute({ maxConcurrency: 3, costBudget: 2.00 });

console.log(results.toArray());
console.log(results.length);
console.log(results.first());

Methods

MethodDescription
Dataset.from(items)Create a dataset from an array
Dataset.range(start, end)Create a dataset from a number range
.map(fn)Transform each item
.filter(fn)Keep items where fn returns true
.flatMap(fn)Transform and flatten
.batch(size)Group items into chunks
.reduce(fn, initial)Reduce to a single value
.execute(options)Run the pipeline

Execute Options

OptionTypeDefaultDescription
maxConcurrencynumber5Max parallel operations
costBudgetnumber--USD threshold
onError"fail-fast" | "continue""continue"Error handling strategy
rateLimitnumber--Min ms between item starts
abortSignalAbortSignal--Cancellation signal

Text Chunking

Split large text to fit within LLM context windows:

typescript
import { chunkText, estimateTokens } from "thinklang";

const { chunks, totalChunks } = chunkText(longArticle, {
  maxTokens: 1000,
  strategy: "paragraph", // "paragraph" | "sentence" | "fixed"
  overlap: 50,
});

console.log(`Split into ${totalChunks} chunks`);
console.log(`First chunk tokens: ~${estimateTokens(chunks[0])}`);

Array Chunking

typescript
import { chunkArray } from "thinklang";

const { chunks } = chunkArray(items, { chunkSize: 10 });

Streaming

Process data incrementally using async generators:

typescript
import { streamThink, collectStream } from "thinklang";

for await (const event of streamThink({
  prompt: longText,
  jsonSchema: { type: "string" },
  chunkOptions: { maxTokens: 500, strategy: "paragraph" },
})) {
  console.log(`Chunk ${event.index + 1}/${event.totalChunks}: ${event.data}`);
}

// Or collect all at once
const allResults = await collectStream(streamThink({ prompt: text, jsonSchema: schema }));

Vector Store

ThinkLang includes an in-memory vector store for RAG-style retrieval. The SimpleEmbedding provider uses hash-based embeddings that require no API calls — perfect for demos and testing.

typescript
import { InMemoryVectorStore, indexText } from "thinklang";

const store = new InMemoryVectorStore();

// Add documents
await store.add([
  { id: "doc-1", text: "TypeScript is a statically typed superset of JavaScript.", metadata: { source: "docs" } },
  { id: "doc-2", text: "Python is widely used in data science and machine learning.", metadata: { source: "docs" } },
  { id: "doc-3", text: "Rust provides memory safety without garbage collection.", metadata: { source: "docs" } },
]);

// Query for similar documents
const results = await store.query("type safety and static typing", { topK: 2, minScore: 0.1 });
for (const { item, score } of results) {
  console.log(`[${score.toFixed(3)}] ${item.text}`);
}

Indexing Text

Use indexText() to automatically chunk and index longer text:

typescript
import { InMemoryVectorStore, indexText } from "thinklang";

const store = new InMemoryVectorStore();
const chunkCount = await indexText(article, store, {
  strategy: "paragraph",
  idPrefix: "article",
  metadata: { source: "article.md" },
});

console.log(`Indexed ${chunkCount} chunks`);
const results = await store.query("testing and assertions", { topK: 1 });

Query Options

OptionTypeDefaultDescription
topKnumber5Maximum number of results to return
minScorenumber0Minimum similarity score threshold
filter(item) => booleanFilter function applied before ranking

Custom Embedding Provider

Implement the EmbeddingProvider interface to use real embeddings (e.g., OpenAI embeddings API):

typescript
import { InMemoryVectorStore, type EmbeddingProvider } from "thinklang";

class OpenAIEmbedding implements EmbeddingProvider {
  dimensions = 1536;
  async embed(texts: string[]): Promise<number[][]> {
    // Call OpenAI embeddings API
    const response = await openai.embeddings.create({ model: "text-embedding-3-small", input: texts });
    return response.data.map(d => d.embedding);
  }
}

const store = new InMemoryVectorStore(new OpenAIEmbedding());

Import Paths

typescript
// From main package
import { batch, mapThink, reduceThink, Dataset, chunkText, streamThink } from "thinklang";

// Or from dedicated entry point
import { batch, mapThink, reduceThink, Dataset, chunkText, streamThink } from "thinklang/data";

Next Steps