[ previous ] [ next ] [ threads ]
 To :  yate@v...
 From :  Maciek Kaminski <maciejka@t...>
 Subject :  latest rmodule
 Date :  Fri, 07 Oct 2005 15:27:36 +0200


/**
 * rmodule.cpp
 * This file is part of the YATE Project http://YATE.null.ro
 * Copyright (C) 2005 Maciek Kaminski
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 */

#include 
#include 
#include 

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

using namespace TelEngine;

static Configuration s_cfg;

Socket s_sock;

//list with all the new connections.
static ObjList s_connectionlist;

//messages to be ommited by watches
static ObjList s_toBeOmitted;
//watches lists
static HashTable s_watches;
//module mutex
static Mutex s_mutex(true);

static long long int s_msg_timeout = -1;

class RemoteModuleThread : public Thread
{
public:
  RemoteModuleThread() : Thread("RemoteModule Listener") { }
  virtual void run();
};

class MsgHolder : public GenObject, public Mutex
{
public:
  MsgHolder(Message &msg);
  Message &m_msg;
  bool m_ret;
  String m_id;
  bool decode(const char *s);
};

class RMessage;

static bool startSkip(String &s, const char *keyword)
{
  if (s.startsWith(keyword,false)) {
    s >> keyword;
    return true;
  }
  return false;
}

class AbstractConnection : public Thread, public MessageReceiver
{
public:
  AbstractConnection();
  ~AbstractConnection();
  virtual void run() = 0;
  void returnMsg(RMessage *msg, String &id, bool accepted);
  void processLine(const char *line);
  virtual bool write(const char *str, int len = -1) = 0;
  inline bool write(const String &s)
  { return write(s.safe(),s.length()); }
  bool received(Message &msg, int id);  
  ObjList m_reenter;
  ObjList m_waiting;
protected:
  bool m_debug;
  ObjList m_relays;
  ObjList m_watches;
  Mutex m_mutex;
};

class RMessage : public Message
{
 public:
  RMessage(AbstractConnection *conn, const char *name, const char *retval = 0) : Message(name, retval), m_id(""), m_conn(conn) {};
  virtual void dispatched(bool accepted);
  String m_id;
 private:
  AbstractConnection* m_conn;
};

void RMessage::dispatched(bool accepted)
{
  m_conn->returnMsg(this, m_id, accepted);
}


AbstractConnection::AbstractConnection()
  : Thread("RemoteModule Connection"), 
    m_debug(false), m_mutex(true)
{
  s_mutex.lock();
  s_connectionlist.append(this);
  s_mutex.unlock();
}

AbstractConnection::~AbstractConnection()
{
  m_debug = false;
  s_mutex.lock();
  s_connectionlist.remove(this,false);
  for (ObjList *p = &m_watches; p; p=p->next()) {
    String *id = static_cast(p->get());
    if (id) {
      ObjList *l = static_cast(s_watches.get(id));
      if(l) {
	l->remove(this, false);
      }
    }
  }
  for (ObjList *p = &m_relays; p; p=p->next()) {
    MessageRelay *mr = static_cast(p->get());
    if (mr && !Engine::exiting()) {
      Engine::uninstall(mr);
    }
  }
  m_relays.clear();
  if (m_waiting.get()) {
    m_waiting.clear();
  }
  s_mutex.unlock();
}

void AbstractConnection::processLine(const char *line)
{
  Debug("RemoteModule", DebugAll,"Processing line: '%s'", line);

  //For messages waiting to be processed.

  m_mutex.lock();
  ObjList *p = &m_waiting;
  for (; p; p=p->next()) {
    MsgHolder *msg = static_cast(p->get());
    if (msg && msg->decode(line)) {
      DDebug("RemoteModule", DebugAll, "Matched message");
      p->remove(false);
      msg->unlock();
      return;
    }
  }

  m_mutex.unlock(); 

  String id(line);
  if (startSkip(id,"%%>install:")) {
    int prio = 100;
    id >> prio >> ":";
    bool ok = true;
    ObjList *p = &m_relays;
    for (; p; p=p->next()) {
      MessageRelay *r = static_cast(p->get());
      if (r && (*r == id)) {
	ok = false;
	break;
      }
    }
    if (ok) {
      MessageRelay *r = new MessageRelay(id,this,0,prio);
      m_relays.append(r);
      Engine::install(r);
    }
    Debug("RemoteModule", DebugAll, "Install '%s', prio %d %s", id.c_str(),prio,ok ? "ok" : "failed");
    String out("%%uninstall:")) 
    {
      int prio = 0;
      bool ok = false;
      ObjList *p = &m_relays;
      for (; p; p=p->next()) {
	MessageRelay *r = static_cast(p->get());
	if (r && (*r == id)) 
	  {
	    prio = r->priority();
	    Engine::uninstall(r);
	    p->remove();
	    ok = true;
	    break;
	  }
      }
      Debug("RemoteModuleReceiver",DebugAll,"Uninstall '%s' %s", id.c_str(),ok ? "ok" : "failed");
      String out("%%watch:")) {
    s_mutex.lock();
    ObjList *l = static_cast(s_watches.get(id));    
    if(!l) {
      l = new ObjList();
      s_watches.put(id, static_cast(l));
    }
    l->append(this);
    m_watches.append(new String(id))->setDelete(true);
    s_mutex.unlock();
    Debug("RemoteModuleReceiver", DebugAll, "Watch '%s' %s", id.c_str(), true ? "ok" : "failed");
    String out("%%unwatch:")) 
    {
      s_mutex.lock();
      ObjList *l = static_cast(s_watches.get(id));
      if(l) {
	l->remove(this, false);
      }
      s_mutex.unlock();
      m_watches.remove(&id);
      Debug("RemoteModuleReceiver",DebugAll,"Unwatch '%s' %s", id.c_str(), true ? "ok" : "failed");
      String out("%%decode(line, id) == -2) {
	m->m_id = id;
	Debug("RemoteModuleReceiver",DebugAll,"Created message '%s' [%p] from:\n%s", m->c_str(), m, line);
	if(m->getParam("userData")) {
	  Message msg_locate("chan.locate");
	  msg_locate.addParam("id", m->getParam("userData")->c_str());
	  if(!Engine::dispatch(msg_locate)) {     
	    Debug("RemoteModuleConnection",DebugAll, "Can't locate channel: '%s'", m->getParam("userData")->c_str());
	  } else {
	    Debug("RemoteModuleConnection",DebugAll, "Channel: '%s' inserted into message ", m->getParam("userData")->c_str());
	    m->userData(msg_locate.userData());
	  }
	  m->clearParam("userData");
	}
	/* Temporary add to the reenter list to avoid reentrance */
	s_mutex.lock();
	m_reenter.append(m)->setDelete(false);
	s_mutex.unlock();
        Engine::enqueue(static_cast(m));
	return;
      } else {
	delete m;
      }
    }
  Debug("RemoteModuleConnection", DebugGoOn, "Error: '%s'", line);
  write("Error in: " + String(line));
}


void AbstractConnection::returnMsg(RMessage *msg, String &id, bool accepted) 
{
  String resp = msg->encode(accepted, id);
  XDebug("RemoteModule", DebugAll, "Message response: %s", resp.c_str());
  resp<<"\n";
  s_mutex.lock();
  m_reenter.remove(msg, false);
  write(resp);
  s_mutex.unlock();
}

bool AbstractConnection::received(Message &msg, int id)
{
#ifndef NDEBUG
    u_int64_t t = Time::now();
#endif

  s_mutex.lock();
  if (m_reenter.find(&msg)) {
    s_mutex.unlock();
    return false;
  }

  MsgHolder h(msg);
  h.lock();

  m_waiting.append(&h)->setDelete(false);

  if(!write(msg.encode(h.m_id) << "\n")) {
    m_waiting.remove(&h, false);
    s_mutex.lock();
    Debug("RemoteModule", DebugAll,"Can't queue message '%s' [%p]", msg.c_str(),&msg);
    cancel();
    return false;
  }
  s_mutex.unlock();

  Debug("RemoteModule", DebugAll,"Queued message '%s' [%p] with timeout: %lldus",msg.c_str(),&msg, s_msg_timeout);
  if(!h.lock(s_msg_timeout)) {
    m_waiting.remove(&h, false);
    Debug("RemoteModule", DebugGoOn, "Mesage '%s'[%p] not handled within given time limit(%lldus).", msg.c_str(), &msg, s_msg_timeout);
  }
  h.unlock();

#ifndef NDEBUG
  Debug("RemoteModule", DebugAll,"Mesage '%s' [%p] returning %s in " FMT64U " usecs", msg.c_str(), &msg, h.m_ret ? "true" : "false", Time::now() - t);
#endif
  /*
  if(h.m_ret) {
    s_mutex.lock();
    s_toBeOmitted.append(&msg);
    s_mutex.unlock();
  }
  */
  return h.m_ret;
}

class RemoteModule : public Plugin
{
public:
  RemoteModule();
  ~RemoteModule();
  virtual void initialize();
private:
  bool m_first;
};

MsgHolder::MsgHolder(Message &msg)
  : Mutex(), m_msg(msg), m_ret(false)
{
  // the address of this object should be unique
  m_id = (unsigned int)this;
  m_id << (unsigned int)random();
}

bool MsgHolder::decode(const char *s)
{
  return (m_msg.decode(s,m_ret,m_id) == -2);
}

// /////////////////////////////////////////////////////////////////////////////
// TCPConnection
// /////////////////////////////////////////////////////////////////////////////
class TCPConnection : public AbstractConnection
{
public:
  TCPConnection(Socket* sock, const char* addr);
  ~TCPConnection();
  virtual void run();
  virtual bool write(const char *str, int len = -1);
  static TCPConnection *checkCreate(Socket* sock, const char* addr = 0);
private:
  Socket* m_socket;
  String m_address;
};

TCPConnection *TCPConnection::checkCreate(Socket* sock, const char* addr)
{
  if (!sock)
    return 0;
  if (!sock->valid()) {
    delete sock;
    return 0;
  }
  // should check IP address here
  TCPConnection *conn = new TCPConnection(sock,addr);
  if (conn->error()) {
    delete conn;
    return 0;
  }
  conn->startup();
  return conn;
}

TCPConnection::TCPConnection(Socket* sock, const char* addr)
  : AbstractConnection(), m_socket(sock), m_address(addr) {}

TCPConnection::~TCPConnection()
{
  delete m_socket;
  m_socket = 0;
}

void TCPConnection::run()
{
  if (!m_socket)
    return;
  if (!m_socket->setBlocking(false)) {
    Debug("RemoteModule",DebugGoOn, "Failed to set tcp socket to nonblocking mode: %s\n",strerror(m_socket->error()));
    return;
  }

  // For the sake of responsiveness try to turn off the tcp assembly timer
  int arg = 1;
  if (!m_socket->setOption(SOL_SOCKET, TCP_NODELAY, &arg, sizeof(arg)))
    Debug("RemoteModule",DebugWarn, "Failed to set tcp socket to TCP_NODELAY mode: %s\n", strerror(m_socket->error()));

  const char *hdr = s_cfg.getValue("general","header","YATE (http://YATE.null.ro) ready.");
  if (hdr) {
    if(!write(hdr)) {    
      return;
    }
    if(!write("\n")) {
      return;
    }
  }

  struct timeval timer;
  char buffer[1024];
  int posinbuf = 0;
  while (posinbuf < (int)sizeof(buffer)-1) {
    timer.tv_sec = 0;
    timer.tv_usec = 30000;
    bool readok = false;
    bool error = false;
    if (m_socket->select(&readok,0,&error,&timer)) {
      if (error) {
	Debug("RemoteModule", DebugInfo, "Socket exception condition on %d", m_socket->handle());
	return;
      }
      if (!readok)
	continue;
      int readsize = m_socket->readData(buffer+posinbuf, sizeof(buffer)-posinbuf-1);
      if (!readsize) {
	Debug("RemoteModule", DebugInfo, "Socket condition EOF on %d", m_socket->handle());
	return;
      } else if (readsize > 0) {
	int totalsize = readsize + posinbuf;
	buffer[totalsize]=0;
	Debug("RemoteModule", DebugAll,"tcp read: '%s'", buffer);
	for (;;) {
	  // Try to accomodate various telnet modes
	  char *eoline = ::strchr(buffer,'\r');
	  if (!eoline)
	    eoline = ::strchr(buffer,'\n');
	  if (!eoline && ((int)::strlen(buffer) < totalsize))
	    eoline=buffer+::strlen(buffer);
	  if (!eoline)
	    break;
	  *eoline=0;
	  if (buffer[0]) {
	    processLine(buffer);
	  }
	  totalsize -= eoline-buffer+1;
	  ::memmove(buffer,eoline+1,totalsize+1);
	}
	posinbuf = totalsize;
      }
      else if (!m_socket->canRetry()) {
	Debug("RemoteModule",DebugWarn,"Socket read error %d on %d", errno, m_socket->handle());
	return;
      }
    }
    else if (!m_socket->canRetry()) {
      Debug("RemoteModule",DebugWarn,"Socket select error %d on %d", errno, m_socket->handle());
      return;
    }
  }
}

bool TCPConnection::write(const char *str, int len) {

  XDebug("RemoteModule", DebugAll,"write: '%s'", str);

  int written = 0;
  int l = len < 0 ? ::strlen(str) : len ;

  while( l > 0 ) {
        
    int w =  m_socket->writeData(str + written, l);
    if( w < 0 ) {
      Debug("RemoteModule",DebugGoOn,"Unable to write to socket: %s", strerror(s_sock.error()));
      cancel();
      return false;
    }
    written += w;
    l -= w;
  } 
  return true;
}

void RemoteModuleThread::run()
{
  for (;;)
    {
      SocketAddr sa;
      Socket* as = s_sock.accept(sa);
      if (!as) {
	Debug("RemoteModule", DebugWarn, "Accept error: %s\n", strerror(s_sock.error()));
	continue;
      } else {
	String addr(sa.host());
	addr << ":" << sa.port();
	if (!TCPConnection::checkCreate(as,addr)) {
	  Debug("RemoteModule",DebugWarn,"Connection rejected for %s",addr.c_str());
	}
      }
    }
}

// /////////////////////////////////////////////////////////////////////////////
// PipeConnection
// /////////////////////////////////////////////////////////////////////////////
class PipeConnection : public AbstractConnection
{
public:
  PipeConnection(String command_line);
  ~PipeConnection();
  virtual void run();
  virtual bool write(const char *str, int len = -1);
  static PipeConnection *checkCreate(String command_line);
private:
  bool create();
  String m_command_line;
  pid_t m_pid;
  int m_in;
  int m_out;
};

PipeConnection *PipeConnection::checkCreate(String command_line)
{
  if (!command_line)
    return 0;
  PipeConnection *conn = new PipeConnection(command_line);
  if (conn->error()) {
    delete conn;
    return 0;
  }
  conn->startup();
  return conn;
}

PipeConnection::PipeConnection(String command_line)
  : AbstractConnection(), m_command_line(command_line), m_pid(0) {}

PipeConnection::~PipeConnection() {
  Lock lock(s_mutex);
  Debug("RemoteModule", DebugAll,"PipeConnection::~PipeConnection() [%p]",this);
  if (m_pid > 0)
    ::kill(m_pid,SIGTERM);
  if (m_in >= 0) {
    ::close(m_in);
    m_in = -1;
  }
  if (m_out >= 0) {
    ::close(m_out);
    m_out = -1;
  }
}

bool PipeConnection::create()
{
  int pid;
  int ext2yate[2];
  int yate2ext[2];

  if (::pipe(ext2yate)) {
    Debug("RemoteModule", DebugWarn, "Unable to create ext->yate pipe: %s",strerror(errno));
    return false;
  }

  if (::pipe(yate2ext)) {
    Debug("RemoteModule", DebugWarn, "Unable to create yate->ext pipe: %s",strerror(errno));
    ::close(ext2yate[0]);
    ::close(ext2yate[1]);
    return false;
  }

  pid = ::fork();
  if (pid < 0) {
    Debug("RemoteModule", DebugWarn, "Failed to fork(): %s", strerror(errno));
    ::close(ext2yate[0]);
    ::close(ext2yate[1]);
    ::close(yate2ext[0]);
    ::close(yate2ext[1]);
    return false;
  }

  if (!pid) {
    /* In child - terminate all other threads if needed */
    Thread::preExec();
    /* Try to immunize child from ^C and ^\ */
    ::signal(SIGINT,SIG_IGN);
    ::signal(SIGQUIT,SIG_IGN);
    /* And restore default handlers for other signals */
    ::signal(SIGTERM,SIG_DFL);
    ::signal(SIGHUP,SIG_DFL);
    /* Redirect stdout */
    ::dup2(ext2yate[1], STDOUT_FILENO);
    ::dup2(yate2ext[0], STDIN_FILENO);
    /* Close stdin */
    //::close(STDIN_FILENO);
    /* Close everything but stdin/out/ */
    for (int x=STDERR_FILENO+1;x<1024;x++) 
      ::close(x);
    /* Execute script */
    if (debugAt(DebugInfo))
      ::fprintf(stderr, "Execing '%s'\n", m_command_line.c_str());
    ::execl("/bin/sh", "sh", "-c", m_command_line.c_str(), (char *)NULL);

    ::fprintf(stderr, "Failed to execute '%s': %s\n", m_command_line.c_str(), strerror(errno));
    /* Shit happened. Die as quick and brutal as possible */
    ::_exit(1);
  }
  Debug("RemoteModule", DebugInfo,"Launched External Script %s, pid: %d", m_command_line.c_str(), pid);
  m_in = ext2yate[0];
  m_out = yate2ext[1];

  /* close what we're not using in the parent */
  ::close(ext2yate[1]);
  ::close(yate2ext[0]);

  m_pid = pid;
  return true;
}


void PipeConnection::run()
{
  if (!create()) {
    m_pid = 0;
    return;
  }

  char buffer[1024];
  int posinbuf = 0;

  DDebug("RemoteModule", DebugAll,"PipeConnection::run() entering loop [%p]",this);

  for(;;) {
    int readsize = (m_in >= 0) ? ::read(m_in,buffer+posinbuf,sizeof(buffer)-posinbuf-1) : 0;
    DDebug("RemoteModule", DebugAll,"Read %d",readsize);
    if (!readsize) {
      Debug("RemoteModule", DebugInfo,"Read EOF on %d [%p]",m_in,this);
      if (m_in != -1) {
	::close(m_in);
	m_in = -1;
      }
      break;
    }
    else if (readsize < 0) {
      Debug("RemoteModule", DebugWarn,"Read error %d on %d [%p]",errno,m_in,this);
      break;
    }

    int totalsize = readsize + posinbuf;
    buffer[totalsize]=0;
    XDebug("RemoteModule", DebugAll,"pipe read: '%s'", buffer);
    for (;;) {
      char *eoline = ::strchr(buffer,'\n');
      if (!eoline && ((int)::strlen(buffer) < totalsize))
	eoline=buffer+::strlen(buffer);
      if (!eoline)
	break;
      *eoline=0;
      if (buffer[0]) {
	processLine(buffer);
      }
      totalsize -= eoline-buffer+1;
      ::memmove(buffer,eoline+1,totalsize+1);
    }
    posinbuf = totalsize;
  }
}

bool PipeConnection::write(const char *str, int len) {
  XDebug("RemoteModule", DebugAll,"write: '%s'", str);

  int written = 0;
  int l = len < 0 ? ::strlen(str) : len ;

  while( l > 0 ) {
    int w = ::write(m_out,str, len);       
    if( w < 0 ) {
      Debug("RemoteModule",DebugGoOn,"Unable to write to pipe: %s", strerror(s_sock.error()));
      cancel();
      return false;
    }
    written += w;
    l -= w;
  } 
  return true;
}

class PostHook : public MessagePostHook
{
public:
    virtual void dispatched(const Message& msg, bool handled);
};

void PostHook::dispatched(const Message& msg, bool handled)
{
  s_mutex.lock();
  ObjList *p = static_cast(s_watches.get(&msg));
  for (; p; p=p->next()) {
    AbstractConnection *con = static_cast(p->get());
    if (con) {
      if(!con->m_reenter.find(&msg)) {
        //if(!s_toBeOmitted.find(&msg)) {
          con->write(msg.encode(handled,"")<< "\n");
	  //}
      }
    }
  }
  //s_toBeOmitted.remove(const_cast(&msg), false);
  s_mutex.unlock();
};

RemoteModule::RemoteModule()
  : m_first(true)
{
  Output("Loaded module RemoteModule");
  //Debugger::setIntOut(dbg_remote_func);
}

RemoteModule::~RemoteModule()
{
  Output("Unloading module RemoteModule");
  s_sock.terminate();
  s_mutex.lock();
  s_connectionlist.clear();
  s_mutex.unlock();
  Debugger::setIntOut(0);
}

void RemoteModule::initialize()
{
  Output("Initializing module RemoteModule");
  s_cfg = Engine::configFile("rmodule");
  s_cfg.load();

  if (s_sock.valid())
    return;

  // check configuration
  int port = s_cfg.getIntValue("tcp","port",5038);
  if(port) {
    const char *host = c_safe(s_cfg.getValue("tcp","addr","127.0.0.1"));
    if (!(port && *host))
      return;

    s_sock.create(AF_INET, SOCK_STREAM);
    if (!s_sock.valid()) {
      Debug("RemoteModule",DebugGoOn,"Unable to create the listening socket: %s",strerror(s_sock.error()));
      return;
    }

    s_msg_timeout = s_cfg.getIntValue("general", "timeout", -1);  

    const int reuseFlag = 1;
    s_sock.setOption(SOL_SOCKET,SO_REUSEADDR,&reuseFlag,sizeof(reuseFlag));

    SocketAddr sa(AF_INET);
    sa.host(host);
    sa.port(port);
    if (!s_sock.bind(sa)) {
      Debug("RemoteModule",DebugGoOn,"Failed to bind to %s:%u : %s",sa.host().c_str(),sa.port(),strerror(s_sock.error()));
      s_sock.terminate();
      return;
    }
    if (!s_sock.listen(2)) {
      Debug("RemoteModule",DebugGoOn,"Unable to listen on socket: %s\n", strerror(s_sock.error()));
      s_sock.terminate();
      return;
    }
  }   

  if(s_cfg.getValue("pipe","script")) {
    String command_line(s_cfg.getValue("pipe","script"));
    if (!PipeConnection::checkCreate(command_line)) {
      Debug("RemoteModule",DebugWarn,"Connection can't be create for %s", command_line.c_str());
    }
  }

  if (m_first) {
    m_first = false;
    Engine::self()->setHook(new PostHook);
    RemoteModuleThread *mt = new RemoteModuleThread;
    mt->startup();
  }


}

INIT_PLUGIN(RemoteModule);

/* vi: set ts=8 sw=4 sts=4 noet: */