stream Method
The stream()
method converts the collection into a ReadableStream, allowing for efficient processing of large datasets one item at a time. This is particularly useful when dealing with large amounts of data that shouldn't be processed all at once.
Basic Syntax
collect(items).stream(): ReadableStream<T>
Basic Usage
import { collect } from 'ts-collect'
// Simple streaming
const numbers = collect([1, 2, 3, 4, 5])
const stream =
// Process stream
const reader = stream.getReader()
while (true) {
const { done, value } = await
if (done) break
console.log(value) // Logs each number
// Stream with processing
const products = collect([
{ id: 1, name: 'Widget A' },
{ id: 2, name: 'Widget B' }
for await (const chunk of {
await processProduct(chunk)
Working with Objects
interface Product {
id: string
name: string
price: number
description: string
const products = collect<Product>([
id: '1',
name: 'Widget',
price: 100,
description: 'A fantastic widget'
// ... many more products
// Process products streaming
const stream =
const reader = stream.getReader()
try {
while (true) {
const { done, value } = await
if (done) break
await updateProduct(value)
} finally {
Real-world Examples
Large Catalog Processor
interface CatalogItem {
sku: string
name: string
description: string
price: number
images: string[]
metadata: Record<string, unknown>
class CatalogProcessor {
async processCatalog(
catalog: Collection<CatalogItem>,
batchSize: number = 100
): Promise<{
processed: number
failed: string[]
}> {
const stream =
const reader = stream.getReader()
let processed = 0
const failed: string[] = []
let batch: CatalogItem[] = []
try {
while (true) {
const { done, value } = await
if (done) {
if (batch.length > 0) {
await this.processBatch(batch, failed)
processed += batch.length
if (batch.length >= batchSize) {
await this.processBatch(batch, failed)
processed += batch.length
batch = []
} finally {
return { processed, failed }
private async processBatch(
items: CatalogItem[],
failed: string[]
): Promise<void> {
await Promise.all( item => {
try {
await this.processItem(item)
} catch (error) {
console.error(`Failed to process ${item.sku}:`, error)
private async processItem(item: CatalogItem): Promise<void> {
// Process individual catalog item
await this.validateData(item)
await this.enrichMetadata(item)
await this.updateSearchIndex(item)
private async validateData(item: CatalogItem): Promise<void> {
// Validation logic
private async enrichMetadata(item: CatalogItem): Promise<void> {
// Metadata enrichment
private async updateSearchIndex(item: CatalogItem): Promise<void> {
// Search index update
Order History Exporter
interface Order {
id: string
date: Date
customer: string
items: Array<{
productId: string
quantity: number
price: number
total: number
class OrderExporter {
async exportOrders(
orders: Collection<Order>,
outputStream: WritableStream<string>
): Promise<{
exported: number
errors: Array<{ id: string; error: string }>
}> {
const writer = outputStream.getWriter()
const stream =
const reader = stream.getReader()
let exported = 0
const errors: Array<{ id: string; error: string }> = []
// Write CSV header
await writer.write(this.getHeaderRow())
try {
while (true) {
const { done, value } = await
if (done) break
try {
await writer.write(this.formatOrder(value))
} catch (error) {
error: error instanceof Error ? error.message : 'Unknown error'
} finally {
await writer.close()
return { exported, errors }
private getHeaderRow(): string {
return 'Order ID,Date,Customer,Items,Total\n'
private formatOrder(order: Order): string {
return `${},${},${order.customer},` +
private formatItems(items: Order['items']): string {
return items
.map(item => `${item.productId}(${item.quantity})`)
Type Safety
interface TypedItem {
id: number
value: string
const items = collect<TypedItem>([
{ id: 1, value: 'A' },
{ id: 2, value: 'B' }
// Type-safe streaming
const stream: ReadableStream<TypedItem> =
// Process with type safety
const reader = stream.getReader()
while (true) {
const { done, value } = await
if (done) break
// TypeScript knows value type
const id: number =
const val: string = value.value
Return Value
- Returns a ReadableStream of collection items
- Stream yields one item at a time
- Maintains original item types
- Supports async iteration
- Preserves item order
- Memory efficient for large datasets
Common Use Cases
1. Large Data Processing
- Catalog updates
- Order processing
- Log analysis
- Data migration
- Batch operations
2. Data Export
- CSV generation
- Report creation
- Data dumps
- Backup creation
- Archive generation
3. Import Operations
- Data import
- Catalog updates
- Price updates
- Inventory sync
- Product updates
4. Content Processing
- Image processing
- Content updates
- Media handling
- Document processing
- File operations
5. Batch Operations
- Order processing
- Invoice generation
- Email sending
- Report creation
- Data updates
6. Data Migration
- System updates
- Platform migration
- Data transfer
- Content moving
- Archive creation
7. Resource Management
- Memory optimization
- Processing control
- Resource utilization
- Load management
- Performance tuning
8. Reporting
- Large reports
- Data analysis
- Statistics generation
- Performance metrics
- Usage tracking
9. Integration
- API integration
- System sync
- Data exchange
- Service integration
- Platform connection
10. Performance Optimization
- Memory usage
- Processing speed
- Resource efficiency
- Load handling
- Scalability