Skip to content

fromStream Method

The fromStream() method creates a collection from a ReadableStream. This method is useful for converting streaming data into a collection that can be manipulated using collection operations.

Basic Syntax

typescript
collect(items).fromStream<U>(stream: ReadableStream<U>): Promise<Collection<U>>

Examples

Basic Usage

typescript
import { collect } from 'ts-collect'

// Create collection from number stream
const numberStream = new ReadableStream({
  start(controller) {
    [1, 2, 3, 4, 5].forEach(n => controller.enqueue(n))
    controller.close()
  }
})

const numbers = await collect([]).fromStream(numberStream)
console.log(numbers.all())  // [1, 2, 3, 4, 5]

// Create from string stream
const textStream = new ReadableStream({
  start(controller) {
    ['hello', 'world'].forEach(s => controller.enqueue(s))
    controller.close()
  }
})

const words = await collect([]).fromStream(textStream)

Working with Objects

typescript
interface Product {
  id: string
  name: string
  price: number
}

// Create product stream
const productStream = new ReadableStream<Product>({
  start(controller) {
    const products = [
      { id: '1', name: 'Widget', price: 100 },
      { id: '2', name: 'Gadget', price: 200 }
    ]
    products.forEach(p => controller.enqueue(p))
    controller.close()
  }
})

// Convert to collection
const products = await collect<Product>([]).fromStream(productStream)

Real-world Examples

Data Import System

typescript
interface ImportRecord {
  line: number
  data: Record<string, unknown>
  errors: string[]
}

class DataImporter {
  async importFromStream(
    dataStream: ReadableStream<ImportRecord>
  ): Promise<{
    imported: Collection<ImportRecord>
    success: number
    failed: number
    errors: Map<number, string[]>
  }> {
    const records = await collect<ImportRecord>([])
      .fromStream(dataStream)

    const validRecords = records.filter(record => record.errors.length === 0)
    const invalidRecords = records.filter(record => record.errors.length > 0)

    const errors = new Map<number, string[]>()
    invalidRecords.each(record => {
      errors.set(record.line, record.errors)
    })

    return {
      imported: validRecords,
      success: validRecords.count(),
      failed: invalidRecords.count(),
      errors
    }
  }

  async processImport(
    dataStream: ReadableStream<ImportRecord>
  ): Promise<void> {
    const { imported, failed, errors } = await this.importFromStream(dataStream)

    if (failed > 0) {
      console.error(`Failed to import ${failed} records:`)
      errors.forEach((errs, line) => {
        console.error(`Line ${line}:`, errs)
      })
    }

    await this.saveImportedRecords(imported)
  }

  private async saveImportedRecords(
    records: Collection<ImportRecord>
  ): Promise<void> {
    // Save records logic
  }
}

File Upload Processor

typescript
interface FileChunk {
  index: number
  data: Uint8Array
  filename: string
}

class FileProcessor {
  async processUploadStream(
    chunkStream: ReadableStream<FileChunk>
  ): Promise<{
    processedFiles: string[]
    totalSize: number
    errors: Array<{ file: string; error: string }>
  }> {
    const chunks = await collect<FileChunk>([])
      .fromStream(chunkStream)

    const fileGroups = chunks
      .sortBy('index')
      .groupBy('filename')

    const processedFiles: string[] = []
    const errors: Array<{ file: string; error: string }> = []
    let totalSize = 0

    for (const [filename, fileChunks] of fileGroups.entries()) {
      try {
        const combinedData = this.combineChunks(fileChunks)
        await this.processFile(filename, combinedData)
        processedFiles.push(filename)
        totalSize += combinedData.length
      } catch (error) {
        errors.push({
          file: filename,
          error: error instanceof Error ? error.message : 'Unknown error'
        })
      }
    }

    return {
      processedFiles,
      totalSize,
      errors
    }
  }

  private combineChunks(chunks: Collection<FileChunk>): Uint8Array {
    const totalSize = chunks.sum(chunk => chunk.data.length)
    const result = new Uint8Array(totalSize)
    let offset = 0

    chunks.each(chunk => {
      result.set(chunk.data, offset)
      offset += chunk.data.length
    })

    return result
  }

  private async processFile(
    filename: string,
    data: Uint8Array
  ): Promise<void> {
    // File processing logic
  }
}

Advanced Usage

Real-time Data Processor

typescript
interface DataPoint {
  timestamp: Date
  value: number
  source: string
}

class RealTimeProcessor {
  async processDataStream(
    dataStream: ReadableStream<DataPoint>,
    windowSize: number = 100
  ): Promise<{
    summary: Map<string, {
      average: number
      max: number
      min: number
    }>
    alerts: Array<{
      source: string
      timestamp: Date
      reason: string
    }>
  }> {
    const dataPoints = await collect<DataPoint>([])
      .fromStream(dataStream)

    const bySource = dataPoints.groupBy('source')
    const summary = new Map()
    const alerts: Array<{
      source: string
      timestamp: Date
      reason: string
    }> = []

    bySource.forEach((points, source) => {
      const values = points.pluck('value')
      const average = values.avg()
      const max = values.max() ?? 0
      const min = values.min() ?? 0

      summary.set(source, { average, max, min })

      // Check for anomalies
      points.each(point => {
        if (point.value > average * 2) {
          alerts.push({
            source,
            timestamp: point.timestamp,
            reason: 'Value exceeds 2x average'
          })
        }
      })
    })

    return { summary, alerts }
  }
}

Type Safety

typescript
interface TypedItem {
  id: number
  value: string
}

// Create typed stream
const stream = new ReadableStream<TypedItem>({
  start(controller) {
    controller.enqueue({ id: 1, value: 'A' })
    controller.enqueue({ id: 2, value: 'B' })
    controller.close()
  }
})

// Type-safe collection creation
const items = await collect<TypedItem>([])
  .fromStream(stream)

// TypeScript knows the type
const firstValue: string = items.first()?.value

Return Value

  • Returns a Promise that resolves to a Collection
  • Collection contains all stream items
  • Maintains item order from stream
  • Preserves item types
  • Supports all collection methods
  • Memory efficient processing

Common Use Cases

1. Data Import

  • File uploads
  • Bulk imports
  • Data migration
  • Content transfer
  • System integration

2. Real-time Processing

  • Event streams
  • Sensor data
  • Log processing
  • Metrics collection
  • System monitoring

3. File Processing

  • Upload handling
  • Chunk processing
  • Document parsing
  • Media processing
  • Content streaming

4. Data Transformation

  • Format conversion
  • Data cleaning
  • Content processing
  • Structure mapping
  • Type conversion

5. Batch Operations

  • Record processing
  • Bulk updates
  • Data validation
  • Content moderation
  • System updates

6. Integration

  • API consumption
  • Service integration
  • Data synchronization
  • Platform connection
  • System bridging

7. Analytics

  • Data collection
  • Metric processing
  • Log analysis
  • Event tracking
  • Performance monitoring

8. Content Management

  • Content imports
  • Media processing
  • Document handling
  • Asset management
  • Resource processing

9. System Migration

  • Data transfer
  • Content migration
  • System updates
  • Platform moves
  • Service transitions

10. Performance Optimization

  • Memory efficiency
  • Resource management
  • Load handling
  • Process scaling
  • Stream processing

Released under the MIT License.