Skip to content

stream Method

The stream() method converts the collection into a ReadableStream, allowing for efficient processing of large datasets one item at a time. This is particularly useful when dealing with large amounts of data that shouldn't be processed all at once.

Basic Syntax

typescript
collect(items).stream(): ReadableStream<T>

Examples

Basic Usage

typescript
import { collect } from 'ts-collect'

// Simple streaming
const numbers = collect([1, 2, 3, 4, 5])
const stream = numbers.stream()

// Process stream
const reader = stream.getReader()
while (true) {
  const { done, value } = await reader.read()
  if (done) break
  console.log(value)  // Logs each number
}

// Stream with processing
const products = collect([
  { id: 1, name: 'Widget A' },
  { id: 2, name: 'Widget B' }
])

for await (const chunk of products.stream()) {
  await processProduct(chunk)
}

Working with Objects

typescript
interface Product {
  id: string
  name: string
  price: number
  description: string
}

const products = collect<Product>([
  {
    id: '1',
    name: 'Widget',
    price: 100,
    description: 'A fantastic widget'
  },
  // ... many more products
])

// Process products streaming
const stream = products.stream()
const reader = stream.getReader()

try {
  while (true) {
    const { done, value } = await reader.read()
    if (done) break
    await updateProduct(value)
  }
} finally {
  reader.releaseLock()
}

Real-world Examples

Large Catalog Processor

typescript
interface CatalogItem {
  sku: string
  name: string
  description: string
  price: number
  images: string[]
  metadata: Record<string, unknown>
}

class CatalogProcessor {
  async processCatalog(
    catalog: Collection<CatalogItem>,
    batchSize: number = 100
  ): Promise<{
    processed: number
    failed: string[]
  }> {
    const stream = catalog.stream()
    const reader = stream.getReader()

    let processed = 0
    const failed: string[] = []
    let batch: CatalogItem[] = []

    try {
      while (true) {
        const { done, value } = await reader.read()

        if (done) {
          if (batch.length > 0) {
            await this.processBatch(batch, failed)
            processed += batch.length
          }
          break
        }

        batch.push(value)
        if (batch.length >= batchSize) {
          await this.processBatch(batch, failed)
          processed += batch.length
          batch = []
        }
      }
    } finally {
      reader.releaseLock()
    }

    return { processed, failed }
  }

  private async processBatch(
    items: CatalogItem[],
    failed: string[]
  ): Promise<void> {
    await Promise.all(
      items.map(async item => {
        try {
          await this.processItem(item)
        } catch (error) {
          failed.push(item.sku)
          console.error(`Failed to process ${item.sku}:`, error)
        }
      })
    )
  }

  private async processItem(item: CatalogItem): Promise<void> {
    // Process individual catalog item
    await this.validateData(item)
    await this.enrichMetadata(item)
    await this.updateSearchIndex(item)
  }

  private async validateData(item: CatalogItem): Promise<void> {
    // Validation logic
  }

  private async enrichMetadata(item: CatalogItem): Promise<void> {
    // Metadata enrichment
  }

  private async updateSearchIndex(item: CatalogItem): Promise<void> {
    // Search index update
  }
}

Order History Exporter

typescript
interface Order {
  id: string
  date: Date
  customer: string
  items: Array<{
    productId: string
    quantity: number
    price: number
  }>
  total: number
}

class OrderExporter {
  async exportOrders(
    orders: Collection<Order>,
    outputStream: WritableStream<string>
  ): Promise<{
    exported: number
    errors: Array<{ id: string; error: string }>
  }> {
    const writer = outputStream.getWriter()
    const stream = orders.stream()
    const reader = stream.getReader()

    let exported = 0
    const errors: Array<{ id: string; error: string }> = []

    // Write CSV header
    await writer.write(this.getHeaderRow())

    try {
      while (true) {
        const { done, value } = await reader.read()

        if (done) break

        try {
          await writer.write(this.formatOrder(value))
          exported++
        } catch (error) {
          errors.push({
            id: value.id,
            error: error instanceof Error ? error.message : 'Unknown error'
          })
        }
      }
    } finally {
      reader.releaseLock()
      await writer.close()
    }

    return { exported, errors }
  }

  private getHeaderRow(): string {
    return 'Order ID,Date,Customer,Items,Total\n'
  }

  private formatOrder(order: Order): string {
    return `${order.id},${order.date.toISOString()},${order.customer},` +
           `${this.formatItems(order.items)},${order.total}\n`
  }

  private formatItems(items: Order['items']): string {
    return items
      .map(item => `${item.productId}(${item.quantity})`)
      .join(';')
  }
}

Type Safety

typescript
interface TypedItem {
  id: number
  value: string
}

const items = collect<TypedItem>([
  { id: 1, value: 'A' },
  { id: 2, value: 'B' }
])

// Type-safe streaming
const stream: ReadableStream<TypedItem> = items.stream()

// Process with type safety
const reader = stream.getReader()
while (true) {
  const { done, value } = await reader.read()
  if (done) break

  // TypeScript knows value type
  const id: number = value.id
  const val: string = value.value
}

Return Value

  • Returns a ReadableStream of collection items
  • Stream yields one item at a time
  • Maintains original item types
  • Supports async iteration
  • Preserves item order
  • Memory efficient for large datasets

Common Use Cases

1. Large Data Processing

  • Catalog updates
  • Order processing
  • Log analysis
  • Data migration
  • Batch operations

2. Data Export

  • CSV generation
  • Report creation
  • Data dumps
  • Backup creation
  • Archive generation

3. Import Operations

  • Data import
  • Catalog updates
  • Price updates
  • Inventory sync
  • Product updates

4. Content Processing

  • Image processing
  • Content updates
  • Media handling
  • Document processing
  • File operations

5. Batch Operations

  • Order processing
  • Invoice generation
  • Email sending
  • Report creation
  • Data updates

6. Data Migration

  • System updates
  • Platform migration
  • Data transfer
  • Content moving
  • Archive creation

7. Resource Management

  • Memory optimization
  • Processing control
  • Resource utilization
  • Load management
  • Performance tuning

8. Reporting

  • Large reports
  • Data analysis
  • Statistics generation
  • Performance metrics
  • Usage tracking

9. Integration

  • API integration
  • System sync
  • Data exchange
  • Service integration
  • Platform connection

10. Performance Optimization

  • Memory usage
  • Processing speed
  • Resource efficiency
  • Load handling
  • Scalability

Released under the MIT License.