The Promise
of rest-core









           http://godfat.org/slide/2015-01-13-rest-core-promise/

Who?

Lin Jen-Shin (godfat)

Table of Contents

RubyConf.TW 2011

RubyConf.TW 2011

RubyConf.TW 2011

RubyConf.TW 2011

RubyConf.TW 2011

RubyConf.TW 2011

RubyConf.TW 2012

 

 

Table of Contents

The API / Use Cases

Futures / Synchronous / Blocking

Print the Name


puts client.get('ruby')['name']
  

Futures / Synchronous / Blocking

Print the Names (Bad)


puts client.get('ruby')['name'] # -- this blocks
puts client.get('opal')['name']

Non-blocking (Better)


puts %w[ruby opal].map{ |n| client.get(n) }.
                   map{ |r| r['name'] }

Futures / Synchronous / Blocking

Or Easier to Read:


a, b = client.get('ruby'), client.get('opal')
puts a['name'], b['name']

Non-blocking (Better)


puts %w[ruby opal].map{ |n| client.get(n) }.
                   map{ |r| r['name'] }

Futures / Synchronous / Blocking

Fire and Forget


client.post('ruby/cut', style: 'brilliant')

Fire and Call


client.post('ruby/cut', style: 'brilliant'){}

The API / Use Cases

Callbacks / Asynchronous / Non-blocking

Fire and Call


client.post('ruby/cut', style: 'brilliant'){}


Callbacks / Asynchronous / Non-blocking

Fire and Call


client.post('ruby/cut', style: 'brilliant') do |r|
  puts r['status']
end

Callbacks / Asynchronous / Non-blocking

Fire and Rescue


client.post('ruby/cut', style: 'brilliant') do |r|
  puts r.kind_of?(Exception)
end

Callbacks / Asynchronous / Non-blocking

Callback Hell?


c = client
%w[ruby opal].each do |n|
  c.get("#{n}/friends") do |f|
    c.get("#{f.first['name']}/friends") do |ff|
      c.post("#{ff.first['name']}/mails", 'Hi')
    end
  end
end

Callbacks / Asynchronous / Non-blocking

Callback Hell? Eliminate One


c = client
%w[ruby opal].each do |n|
  c.get("#{n}/friends") do |f|
    ff = c.get("#{f.first['name']}/friends")
    c.post("#{ff.first['name']}/mails", 'Hi')
  end
end

Callbacks / Asynchronous / Non-blocking

Callback Hell? But This Blocks!


c = client
%w[ruby opal].map do |n|
  c.get("#{n}/friends")
end.map do |f|
  c.get("#{f.first['name']}/friends")
end.map do |ff|
  c.post("#{ff.first['name']}/mails", 'Hi')
end

Callbacks / Asynchronous / Non-blocking

We Still Need Callbacks

The API / Use Cases

Wait! It's not yet done

RC::Client#wait


c = client
%w[ruby opal].each do |n|
  c.get("#{n}/friends") do |f|
    ff = c.get("#{f.first['name']}/friends")
    c.post("#{ff.first['name']}/mails", 'Hi')
  end
end


Wait! It's not yet done

RC::Client#wait


r = []
%w[ruby opal].each do |n|
  c.get("#{n}/friends") do |f|
    ff = c.get("#{f.first['name']}/friends")
    r << c.post("#{ff.first['name']}/mails", 'Hi')
  end
end
c.wait       # only for this client instance
puts r

Wait! It's not yet done

RC::Client.wait


r = []
%w[ruby opal].each do |n|
  c.get("#{n}/friends") do |f|
    ff = c.get("#{f.first['name']}/friends")
    r << c.post("#{ff.first['name']}/mails", 'Hi')
  end
end
c.class.wait # for all requests from this class
puts r

Wait! It's not yet done

RC::Client.wait


r = []
%w[ruby opal].each do |n|
  c.get("#{n}/friends") do |f|
    ff = c.get("#{f.first['name']}/friends")
    r << c.post("#{ff.first['name']}/mails", 'Hi')
  end
end
c.class.wait # useful for gracefully shutdown
puts r

The API / Use Cases

Promise Chain: Promise#then

Without Waiting... Change This:


r = []
%w[ruby opal].each do |n|
  c.get("#{n}/friends") do |f|
    ff = c.get("#{f.first['name']}/friends")
    r << c.post("#{ff.first['name']}/mails", 'Hi')
  end
end
c.wait
puts r

Promise Chain: Promise#then

Without Waiting... To This:


r = %w[ruby opal].map do |n|
  c.get("#{n}/friends", {}, RC::ASYNC => true,
        RC::RESPONSE_KEY => RC::PROMISE).
        then do |res|
    f = res[RC::RESPONSE_BODY]
    ff = c.get("#{f.first['name']}/friends")
    st = c.post("#{ff.first['name']}/mails", 'Hi')
    res.merge(RC::RESPONSE_BODY => st)
  end.future_response[RC::RESPONSE_BODY]
end
puts r

Promise Chain: Promise#then

Why: It's Not Blocking Here


r = %w[ruby opal].map do |n|
  c.get("#{n}/friends", {}, RC::ASYNC => true,
        RC::RESPONSE_KEY => RC::PROMISE).
        then do |res|
    f = res[RC::RESPONSE_BODY]
    ff = c.get("#{f.first['name']}/friends")
    st = c.post("#{ff.first['name']}/mails", 'Hi')
    res.merge(RC::RESPONSE_BODY => st)
  end.future_response[RC::RESPONSE_BODY]
end
puts r

# RC::Github#all

def all p, query={}, o={}
  q = {:per_page => MAX_PER_PAGE}.merge(query)
  r = get(p, q, o.merge(RC::ASYNC => true,
          RESPONSE_KEY => PROMISE)).then{ |res|
    b = res[RESPONSE_BODY] +
        page_range(res).map{ |page|
          get(p, q.merge(:page => page),
            o.merge(RESPONSE_KEY=>RESPONSE_BODY))
        }.inject([], &:+)
    res.merge(RESPONSE_BODY => b)
  }.future_response
  if block_given?
    yield(r[response_key(o)]); self
  else
    r[response_key(o)]
  end
end

The API / Use Cases

Promise.claim

Forging Response,
for middleware RC::Cache


RC::Promise.
  claim(res, k, body, 200, 'Header' => 'Here').
  future_response

Promise.claim

Forging Response,
for middleware RC::Cache


def RC::Promise.claim env, k=RC.id,
                      body, status, headers
  promise = new(env, k)
  promise.fulfill(body, status, headers)
  promise
end

Table of Contents

Streaming, the EventSource

Streaming, the EventSource

SSE, Server-Sent Events

is a technology where a browser receives automatic updates from a server via HTTP connection. The Server-Sent Events EventSource API is standardized as part of HTML5 by the W3C.

Said Wikipedia.

Streaming, the EventSource

SSE, Server-Sent Events


var es = new EventSource('/api/sse');
es.onmessage = function (event) {
  console.log(event.data);
};

Streaming, the EventSource

SSE with Rack Hijacking


get '/api/sse' do # (jellyfish's/raggi's example)
  headers_merge(
    'Content-Type' => 'text/event-stream',
    'rack.hijack'  => lambda do |sock|
      loop do
        sock.write("data: Aloha!\n\n")
        sleep 5
      end
    end)
end

Streaming, the EventSource

Streaming, the EventSource

Firebase Daemon (rest-firebase)


require 'rest-firebase'

es = RestFirebase.new(auth: false).event_source(
       'https://SampleChat.firebaseIO-demo.com/')

Streaming, the EventSource

Firebase Daemon (rest-firebase)


es.onerror do |error|
  puts "ERROR: #{error}"
end

es.onreconnect do
  !!@start # always reconnect unless stopping
end

es.onmessage do |event, data|
  puts "EVENT: #{event}, DATA: #{data}"
end

Streaming, the EventSource

Firebase Daemon (rest-firebase)


@start = true
es.start

rd, wr = IO.pipe
Signal.trap('INT') do # intercept ctrl-c
  @start = false      # stop reconnecting
  es.close            # close socket
  es.wait             # wait for shutting down
  wr.puts             # unblock main thread
end
rd.gets               # main thread blocks here

Streaming, the EventSource

Firebase Daemon (rest-firebase)


curl -X POST -d '{"message": "Hi!"}' \
https://SampleChat.firebaseIO-demo.com/ruby.json

EVENT: put, DATA:
  {"path"=>"/ruby/-JfOWDn1LQJn-ng9EcF6",
   "data"=>{"message"=>"Hi!"}}

Table of Contents

Some Background...

Race Condition

is the behavior of a software system where the output is dependent on the sequence or timing of other uncontrollable events.


Said Wikipedia.

Mutex (lock)

is a synchronization mechanism for enforcing limits on access to a resource in an environment where there are many threads of execution.

Said Wikipedia.

Race Condition


@i = 0         #
#---thread a---#---thread b-----

  tmp = @i
  @i  = tmp + 1


                   tmp = @i
                   @i  = tmp + 1

# -------------#----------------
@i # => 2      #

Race Condition


@i = 0         #
#---thread a---#---thread b-----

  tmp = @i

                   tmp = @i
  @i  = tmp + 1

                   @i  = tmp + 1

# -------------#----------------
@i # => 1      #

Mutex (lock)


@i = 0         #
#---thread a---#---thread b-----
synchronize do
  tmp = @i
  @i  = tmp + 1
end
                 synchronize do
                   tmp = @i
                   @i  = tmp + 1
                 end
# -------------#----------------
@i # => 2      #

Some Background...

ConditionVariable (monitor)

is a synchronization construct that allows threads to have both mutual exclusion and the ability to wait (block) for a certain condition to become true.

Said Wikipedia.

ConditionVariable (monitor)


#---thread a---#---thread b----------------------
synchronize do # <= a acquires lock
  condv.wait   # <= a releases lock and sleeps
                 synchronize do # b acquires lock
                   @i = 1
                   condv.signal # a wakes up
                 end            # b releases lock
               # <= a acquires lock
  puts @i      # => 1
end            # <= a releases lock

Some Background...

WeakRef (weak reference)

is a reference that does not protect the referenced object from collection by a garbage collector, unlike a strong reference.


Said Wikipedia.

WeakRef (weak reference)


require 'weakref'
ref = WeakRef.new(Object.new)

GC.start # collecting ^^^^^^

p ref.weakref_alive? # => nil
p ref # raise WeakRef::RefError
# Invalid Reference - probably recycled

Table of Contents

Concurrency Model

Concurrency Model

Concurrency Model

Gotcha!

Weird Backtrace for Exceptions


def f n=1
  if n.zero?
    # Thread.new do
      raise '' rescue p $!.backtrace
    # end.join
  else
    f(n-1)
  end
end
f # => ["-:4:in `f'", "-:7:in `f'",
  #     "-:10:in `<main>'"]

Weird Backtrace for Exceptions


def f n=1
  if n.zero?
    Thread.new do
      raise '' rescue p $!.backtrace
    end.join
  else
    f(n-1)
  end
end
f # => ["-:4:in `block in f'"]
  #

Retaining Backtrace to Ease Debugging


def defer
  if pool_size < 0 # none / blocking
  else
    # retain the backtrace so far
    backtrace = caller + self.class.backtrace
    if pool_size > 0 # thread pool
    else
      self.thread = Thread.new do
        Thread.current[:backtrace] = backtrace
        protected_yield{ yield }
      end
    end
  end
end

Retaining Backtrace to Ease Debugging

Concurrency Model

Gotcha!

 


at_exit do
  RC::Universal.shutdown
end

def shutdown # RC::Client.shutdown
  thread_pool.shutdown
  wait
end

def shutdown # RC::ThreadPool#shutdown
  workers.size.times{ trim(true) }
  workers.first.join && trim(true) until
    workers.empty?
  queue.clear
end

Concurrency Model

Gotcha!

Beware of Deadlock!


RC::Universal.pool_size = 1
client = RC::Universal.new
client.get('ruby/friends') do |response|
  path = "#{response.first['name']}/friends"
  puts client.get(path).first['name']
end

Beware of Deadlock!


RC::Universal.pool_size = 1
client = RC::Universal.new
client.get('ruby/friends') do |response|
  path = "#{response.first['name']}/friends"
  puts client.get(path).first['name'] # DEADLOCK!
end

Need at least two threads!

Beware of Timeout!


RC::Universal.pool_size = 1
client = RC::Universal.new(:timeout => 10)
client.get('ruby/friends') do |response|
  path = "#{response.first['name']}/friends"
  puts client.get(path).first['name'] # TIMEOUT!
end

Raise timeout in this case

Beware of Timeout!


RC::Universal.pool_size = 1
client = RC::Universal.new(:timeout => 10)

1000.times do |i|
  client.get("ruby/#{i}"){ |res| puts res }
end

If the request needs > 10ms

Beware of Timeout!


RC::Universal.pool_size = 10
client = RC::Universal.new(:timeout => 10)

1000.times do |i|
  client.get("ruby/#{i}"){ |res| puts res }
end

If the request needs > 100ms

Concurrency Model

Gotcha!

Inaccurate Timeout

Increase Polling Interval


RC::Timer.interval = 0.5 # Default 1 second

Concurrency Model

Concurrency Model

Wait! Detail


# Wait for all the requests
# to be done for this client
def wait ps=promises, m=mutex
  return self if ps.empty?
  current_promises = nil
  m.synchronize do
    current_promises = ps.dup
    ps.clear
  end
  current_promises.each do |p|
    next unless p.weakref_alive?
    begin
      p.wait
    rescue WeakRef::RefError
    end # it's gc'ed after we think it's alive
  end
  wait(ps, m)
end

Concurrency Model

Wait! Detail

Concurrency Model - Promise#wait


# called in client thread (client.wait)
def wait
  # it might be awaken by some other futures!
  mutex.synchronize do
    condv.wait(mutex) until done?
  end unless done?
end

Concurrency Model

Wait! Detail

Concurrency Model - Promise#fulfilling


def fulfilling body, status, headers, socket=nil
  self.body, self.status,
  self.headers, self.socket =
    body, status, headers, socket
  # under ASYNC callback, should call immediately
  callback if immediate
ensure
  # client or response might be waiting
  condv.broadcast
end

Concurrency Model - Promise#rejecting


def rejecting error
  self.error = if error.kind_of?(Exception)
                 error
               else
                 Error.new(error || 'unknown')
               end
  fulfilling('', 0, {})
end

Concurrency Model

Wait! Detail

Concurrency Model - Future#method_missing


def method_missing msg, *args, &block
  @promise.yield[@target].
    __send__(msg, *args, &block)
end

Concurrency Model - Promise#yield


# called in client thread
# (from the future (e.g. body))
def yield
  wait
  callback
end

Concurrency Model

Concurrency Model

Promise Detail

Concurrency Model - None / Blocking (For debugging purpose)

RC::Universal.pool_size = -1

def defer
  if pool_size < 0
    # set working thread
    self.thread = Thread.current
    # avoid any exception and do the job
    protected_yield{ yield }
  else
    # ...
  end
end

Concurrency Model - Thread Spawn (default)

RC::Universal.pool_size =  0
def defer
  if pool_size < 0 # none / blocking
  else
    # retain the backtrace so far
    backtrace = caller + self.class.backtrace
    if pool_size > 0 # thread pool
    else
      self.thread = Thread.new do
        Thread.current[:backtrace] = backtrace
        protected_yield{ yield }
      end
    end
  end
end

Concurrency Model - Thread Pool (Throttling)

RC::Universal.pool_size = 10

Concurrency Model - Thread Pool (Throttling)

if pool_size > 0
  mutex.synchronize do
    # still timing it out if the task
    # never processed
    env[TIMER].on_timeout{cancel_task}
      if env[TIMER]
    self.task = client_class.thread_pool.
      defer(mutex) do
        Thread.current[:backtrace] = backtrace
        protected_yield{ yield }
        Thread.current[:backtrace] = nil
      end
  end
else # thread spawn
end
def defer
  if pool_size < 0 # negative number for blocking call
    self.thread = Thread.current # set working thread
    protected_yield{ yield } # avoid any exception and do the job
  else
    backtrace = caller + self.class.backtrace # retain the backtrace so far
    if pool_size > 0
      mutex.synchronize do
        # still timing it out if the task never processed
        env[TIMER].on_timeout{ cancel_task } if env[TIMER]
        self.task = client_class.thread_pool.defer(mutex) do
          Thread.current[:backtrace] = backtrace
          protected_yield{ yield }
          Thread.current[:backtrace] = nil
        end
      end
    else
      self.thread = Thread.new do
        Thread.current[:backtrace] = backtrace
        protected_yield{ yield }
      end
    end
  end
end

Concurrency Model

Promise Detail

def protected_yield
  if env[TIMER]
    timeout_protected_yield{ yield }
  else
    yield
  end
rescue Exception => e # could be Timeout::Error
  mutex.synchronize do
    self.class.set_backtrace(e)
    if done? # log user callback error
      callback_error(e)
    else # IOError, SystemCallError, etc
      begin
        rejecting(e) # would call user callback
      rescue Exception => f
        callback_error(f)# log user callback error
      end
    end
  end
end

Promise#timeout_protected_yield


def timeout_protected_yield
  # timeout might already be set for
  # thread_pool (pool_size > 0)
  env[TIMER].on_timeout{ cancel_task } unless
    env[TIMER].timer
  yield
ensure
  env[TIMER].cancel
end

Concurrency Model

Promise Detail


def cancel_task bt=nil
  mutex.synchronize do
    next if done? # don't cancel if it's done
    if t = thread || task.thread
      # raise Timeout::Error to working thread
      t.raise(env[TIMER].error)
    else # task was queued and never started,
      begin # just cancel it and fulfil the
        task.cancel # promise with Timeout::Error
        rejecting(env[TIMER].error)
      rescue Exception => e
        # log user callback error
        callback_error(e) do
          e.set_backtrace(e.backtrace + (bt||[]))
        end
      end
    end
  end
end

Concurrency Model

Concurrency Model

ThreadPool Detail

Concurrency Model - ThreadPool#defer


def defer mutex=nil, &job
  task = Task.new(job, mutex)
  queue << task
  spawn_worker if waiting == 0 &&
                  workers.size < max_size
  task
end

Concurrency Model

ThreadPool Detail

Concurrency Model - ThreadPool#spawn_worker


def spawn_worker
  workers << Thread.new{
    task = nil
    begin
      mutex.synchronize{ @waiting += 1 }
      task = queue.pop(idle_time)
      mutex.synchronize{ @waiting -= 1 }
    end while task.call(Thread.current)
    mutex.synchronize do
      workers.delete(Thread.current)
    end
  }
end

Concurrency Model

ThreadPool Detail

Concurrency Model - ThreadPool::Task


class Task < Struct.new(:job, :mutex, :thread,
                        :cancelled)
  # this should never fail
  def call working_thread
    mutex.synchronize do
      return if cancelled
      self.thread = working_thread
    end
    job.call
    true
  end
  # we should probably synchronize this, too!
  def cancel; self.cancelled = true; end
end

Bibliography

Q?


                            https://github.com/godfat/rest-core






gem install rest-core