Sample compute node ping script

When a new elastic compute node is booted, it needs to contact Arvados to register itself. Here is an example ping script to run on boot.

#!/usr/bin/env ruby

require 'rubygems'

require 'cgi'
require 'fileutils'
require 'json'
require 'net/https'
require 'socket'
require 'syslog'

class ComputeNodePing
  @@NODEDATA_DIR = "/var/tmp/arv-node-data"
  @@PUPPET_CONFFILE = "/etc/puppet/puppet.conf"
  @@HOST_STATEFILE = "/var/run/arvados-compute-ping-hoststate.json"

  def initialize(args, stdout, stderr)
    @stdout = stdout
    @stderr = stderr
    @stderr_loglevel = ((args.first == "quiet") ?
                        Syslog::LOG_ERR : Syslog::LOG_DEBUG)
    @puppet_disabled = false
    @syslog = Syslog.open("arvados-compute-ping",
                          Syslog::LOG_CONS | Syslog::LOG_PID,
                          Syslog::LOG_DAEMON)
    @puppetless = File.exist?('/compute-node.puppetless')

    begin
      prepare_ping
      load_puppet_conf unless @puppetless
      begin
        @host_state = JSON.parse(IO.read(@@HOST_STATEFILE))
      rescue Errno::ENOENT
        @host_state = nil
      end
    rescue
      @syslog.close
      raise
    end
  end

  def send
    pong = send_raw_ping

    if pong["hostname"] and pong["domain"] and pong["first_ping_at"]
      if @host_state.nil?
        @host_state = {
          "fqdn" => (Socket.gethostbyname(Socket.gethostname).first rescue nil),
          "resumed_slurm" =>
            ["busy", "idle"].include?(pong["crunch_worker_state"]),
        }
        update_host_state({})
      end

      if hostname_changed?(pong)
        disable_puppet unless @puppetless
        rename_host(pong)
        update_host_state("fqdn" => fqdn_from_pong(pong),
                          "resumed_slurm" => false)
      end

      unless @host_state["resumed_slurm"]
        run_puppet_agent unless @puppetless
        resume_slurm_node(pong["hostname"])
        update_host_state("resumed_slurm" => true)
      end
    end

    log("Last ping at #{pong['last_ping_at']}")
  end

  def cleanup
    enable_puppet if @puppet_disabled and not @puppetless
    @syslog.close
  end

  private

  def log(message, level=Syslog::LOG_INFO)
    @syslog.log(level, message)
    if level <= @stderr_loglevel
      @stderr.write("#{Time.now.strftime("%Y-%m-%d %H:%M:%S")} #{message}\n")
    end
  end

  def abort(message, code=1)
    log(message, Syslog::LOG_ERR)
    exit(code)
  end

  def run_and_check(cmd_a, accept_codes, io_opts, &block)
    result = IO.popen(cmd_a, "r", io_opts, &block)
    unless accept_codes.include?($?.exitstatus)
      abort("#{cmd_a} exited #{$?.exitstatus}")
    end
    result
  end

  DEFAULT_ACCEPT_CODES=[0]
  def check_output(cmd_a, accept_codes=DEFAULT_ACCEPT_CODES, io_opts={})
    # Run a command, check the exit status, and return its stdout as a string.
    run_and_check(cmd_a, accept_codes, io_opts) do |pipe|
      pipe.read
    end
  end

  def check_command(cmd_a, accept_codes=DEFAULT_ACCEPT_CODES, io_opts={})
    # Run a command, send stdout to syslog, and check the exit status.
    run_and_check(cmd_a, accept_codes, io_opts) do |pipe|
      pipe.each_line do |line|
        line.chomp!
        log("#{cmd_a.first}: #{line}") unless line.empty?
      end
    end
  end

  def replace_file(path, body)
    open(path, "w") { |f| f.write(body) }
  end

  def update_host_state(updates_h)
    @host_state.merge!(updates_h)
    replace_file(@@HOST_STATEFILE, @host_state.to_json)
  end

  def disable_puppet
    check_command(["puppet", "agent", "--disable"])
    @puppet_disabled = true
    loop do
      # Wait for any running puppet agents to finish.
      check_output(["pgrep", "puppet"], 0..1)
      break if $?.exitstatus == 1
      sleep(1)
    end
  end

  def enable_puppet
    check_command(["puppet", "agent", "--enable"])
    @puppet_disabled = false
  end

  def prepare_ping
    begin
      ping_uri_s = File.read(File.join(@@NODEDATA_DIR, "arv-ping-url"))
    rescue Errno::ENOENT
      abort("ping URL file is not present yet, skipping run")
    end

    ping_uri = URI.parse(ping_uri_s)
    payload_h = CGI.parse(ping_uri.query)

    # Collect all extra data to be sent
    dirname = File.join(@@NODEDATA_DIR, "meta-data")
    Dir.open(dirname).each do |basename|
      filename = File.join(dirname, basename)
      if File.file?(filename)
        payload_h[basename.gsub('-', '_')] = File.read(filename).chomp
      end
    end

    ping_uri.query = nil
    @ping_req = Net::HTTP::Post.new(ping_uri.to_s)
    @ping_req.set_form_data(payload_h)
    @ping_client = Net::HTTP.new(ping_uri.host, ping_uri.port)
    @ping_client.use_ssl = ping_uri.scheme == 'https'
  end

  def send_raw_ping
    begin
      response = @ping_client.start do |http|
        http.request(@ping_req)
      end
      if response.is_a? Net::HTTPSuccess
        pong = JSON.parse(response.body)
      else
        raise "response was a #{response}"
      end
    rescue JSON::ParserError => error
      abort("Error sending ping: could not parse JSON response: #{error}")
    rescue => error
      abort("Error sending ping: #{error}")
    end

    replace_file(File.join(@@NODEDATA_DIR, "pong.json"), response.body)
    if pong["errors"] then
      log(pong["errors"].join("; "), Syslog::LOG_ERR)
      if pong["errors"].grep(/Incorrect ping_secret/).any?
        system("halt")
      end
      exit(1)
    end
    pong
  end

  def load_puppet_conf
    # Parse Puppet configuration suitable for rewriting.
    # Save certnames in @puppet_certnames.
    # Save other functional configuration lines in @puppet_conf.
    @puppet_conf = []
    @puppet_certnames = []
    open(@@PUPPET_CONFFILE, "r") do |conffile|
      conffile.each_line do |line|
        key, value = line.strip.split(/\s*=\s*/, 2)
        if key == "certname"
          @puppet_certnames << value
        elsif not (key.nil? or key.empty? or key.start_with?("#"))
          @puppet_conf << line
        end
      end
    end
  end

  def fqdn_from_pong(pong)
    "#{pong['hostname']}.#{pong['domain']}"
  end

  def certname_from_pong(pong)
    fqdn = fqdn_from_pong(pong).sub(".", ".compute.")
    "#{pong['first_ping_at'].gsub(':', '-').downcase}.#{fqdn}"
  end

  def hostname_changed?(pong)
    if @puppetless
      (@host_state["fqdn"] != fqdn_from_pong(pong))
    else
      (@host_state["fqdn"] != fqdn_from_pong(pong)) or
        (@puppet_certnames != [certname_from_pong(pong)])
    end
  end

  def rename_host(pong)
    new_fqdn = fqdn_from_pong(pong)
    log("Renaming host from #{@host_state["fqdn"]} to #{new_fqdn}")

    replace_file("/etc/hostname", "#{new_fqdn.split('.', 2).first}\n")
    check_output(["hostname", new_fqdn])

    ip_address = check_output(["facter", "ipaddress"]).chomp
    esc_address = Regexp.escape(ip_address)
    check_command(["sed", "-i", "/etc/hosts",
                   "-e", "s/^#{esc_address}.*$/#{ip_address}\t#{new_fqdn}/"])

    unless @puppetless
      new_conflines = @puppet_conf + ["\n[agent]\n",
                                      "certname=#{certname_from_pong(pong)}\n"]
      replace_file(@@PUPPET_CONFFILE, new_conflines.join(""))
      FileUtils.remove_entry_secure("/var/lib/puppet/ssl")
    end
  end

  def run_puppet_agent
    log("Running puppet agent")
    enable_puppet
    check_command(["puppet", "agent", "--onetime", "--no-daemonize",
                   "--no-splay", "--detailed-exitcodes",
                   "--ignorecache", "--no-usecacheonfailure"],
                  [0, 2], {err: [:child, :out]})
  end

  def resume_slurm_node(node_name)
    current_state = check_output(["sinfo", "--noheader", "-o", "%t",
                                  "-n", node_name]).chomp
    if %w(down drain drng).include?(current_state)
      log("Resuming node in SLURM")
      check_command(["scontrol", "update", "NodeName=#{node_name}",
                     "State=RESUME"], [0], {err: [:child, :out]})
    end
  end
end

LOCK_DIRNAME = "/var/lock/arvados-compute-node.lock"
begin
  Dir.mkdir(LOCK_DIRNAME)
rescue Errno::EEXIST
  exit(0)
end

ping_sender = nil
begin
  ping_sender = ComputeNodePing.new(ARGV, $stdout, $stderr)
  ping_sender.send
ensure
  Dir.rmdir(LOCK_DIRNAME)
  ping_sender.cleanup unless ping_sender.nil?
end

Previous: Install Node Manager Next: Install the Crunch dispatcher

The content of this documentation is licensed under the Creative Commons Attribution-Share Alike 3.0 United States licence.
Code samples in this documentation are licensed under the Apache License, Version 2.0.