Skip to content

Resume reading a stream

Chris Chang edited this page Apr 28, 2017 · 1 revision
const fs = require('fs')
const AWS = require('aws-sdk')
const { KinesisReadable } = require('kinesis-streams')

const client = new AWS.Kinesis()

const options = {}

// Set the beginning sequence number if we can
try {
  const lastSequenceNumber = fs.readFileSync('kinesis.watermark', {encoding: 'utf8'})
  options.ShardIteratorType = 'AFTER_SEQUENCE_NUMBER'
  options.StartingSequenceNumber = lastSequenceNumber
  console.log('\x1b[36mStarting after %s\x1b[0m', lastSequenceNumber)
} catch (err) {
  if (err.code !== 'ENOENT') {
    console.error(err)
    process.exit(1)
  }
  console.log('\x1b[36mStarting from the beginning of the stream\x1b[0m')
}

const reader = new KinesisReadable(client, 'sparrow-pageview-ci', options)

// Save the last sequence number we saw
reader.on('checkpoint', (sequenceNumber) => {
  fs.writeFile('kinesis.watermark', sequenceNumber, (err) => {
    if (err) {
      console.error(err)
      process.exit(1)
    }

    console.log('\n\x1b[36mSAVED: %s\x1b[0m', sequenceNumber)
  })
})

reader.pipe(process.stdout)
Clone this wiki locally