ZeroC IceStorm: gestionando eventos

IcenetworkingPython

En esta receta se describe qué es IceStorm (el servicio de publicación-subscripción de ICE) junto con algunos ejemplos que muestran su funcionalidad.

Antecedentes

En Empezar con ZeroC Ice en Debian se describió cómo adentrarse en ZeroC Ice. Ahora llega el turno de conocer IceStorm, uno de los servicios que nos ofrece esta herramienta multiusos.

Nota: Para probar los ejemplos de esta receta se ha utilizado la versión 3.2.1-3 de Ice y la versión 2.4.4 de Python.

Conceptos básicos

IceStorm es un servicio eficiente de publicación-subscripción para aplicaciones Ice. Desde un punto de vista funcional, IceStorm actúa como mediador entre el publicador y el subscriptor, proporcionando varias ventajas:

  • Sólo es necesaria una llamada al servicio IceStorm para distribuir la información a los subscriptores.
  • Independencia entre el emisor y los receptores de información, permitiendo que el primero se ocupe de las responsabilidades relativas a la aplicación y no de tareas administrativas.
  • Los cambios introducidos en el código son mínimos para incorporar la funcionalidad de IceStorm.

De manera general, una aplicación indica su interés en recibir mensajes subscribiéndose a un topic. Un servidor IceStorm soporta cualquier número de topics, los cuales son creados dinámicamente y distinguidos por un nombre único. Cada topic puede tener varios publicadores y subscriptores. Un topic es equivalente a una interfaz Slice: las operaciones del interfaz definen los tipos de mensajes soportados por el topic. Un publicador usa un proxy al interfaz topic para enviar sus mensajes, y un subscriptor implementa la interfaz topic (o derivada) para recibir los mensajes. Realmente dicha interfaz representa el contrato entre el publicador (cliente) y el subscriptor (servidor), excepto que IceStorm encamina cada mensaje a múltiples receptores de forma transparente.

Un ejemplo sencillo

A continuación se mostrará un ejemplo sencillo de cómo utilizar IceStorm. Dicho ejemplo consiste en una serie de objetos cuyo propósito es procesar una determinada estructura de datos. Para ello, se define el siguiente archivo a través de Slice (simple.ice):

module Simple
{
	struct Message {
	       string content;
	};
	interface Processor {
		void process (Message m);  
	};
};

Básicamente, disponemos de una estructura que contiene una cadena y una interfaz con una sencilla operación de procesado. Teniendo en cuenta la funcionalidad básica de IceStorm, con esta sencilla aproximación podríamos diseñar un sistema que permitiera procesar esta estructura por distintos servidores empleando una única llamada a IceStorm y de forma transparente para el publicador.

Creando el publicador

Veamos ahora el código del publicador (publisher.py):

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys, Ice, IceStorm
Ice.loadSlice('simple.ice', ['-I', '/usr/share/slice'])
import Simple

class Publisher (Ice.Application):
    def run (self, argv):
        # Proxy al TopicManager.
        prx = self.communicator().stringToProxy('Simple/TopicManager:tcp -p 10000')
        topicManagerPrx = IceStorm.TopicManagerPrx.checkedCast(prx)
        # Proxy al topic.
        try:
            topicPrx = topicManagerPrx.retrieve('MyTopic')
        except IceStorm.NoSuchTopic:
            topicPrx = topicManagerPrx.create('MyTopic')
        # Proxy al objeto que publica.
        pub = topicPrx.getPublisher()
        processorPrx = Simple.ProcessorPrx.uncheckedCast(pub)
        # Publicación.
        m = Simple.Message('content')
        processorPrx.process(m)
        return 0

Publisher().main(sys.argv)

Como se puede apreciar, el código habla por sí mismo (incluso sin fijarnos en los comentarios). Ésta es una de las principales ventajas de utilizar Ice: hace que el crear aplicaciones distribuidas implique muy pocos cambios en el código de la aplicación. A nivel general, se llevan a cabo los siguientes pasos:

  1. Se crea un proxy a un objeto de tipo TopicManager.
  2. Se recupera un proxy al topic en cuestión.
  3. Se obtiene un proxy al publicador.
  4. Se invoca a la operación remota process.

Creando el subscriptor

Sencillo, ¿verdad? Pues el código del subscriptor (subscriptor.py) se mantiene en esa misma línea:

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys, Ice, IceStorm
Ice.loadSlice('simple.ice', ['-I', '/usr/share/slice'])
import Simple

class ProcessorI (Simple.Processor):
    def process (self, message, curr):
        print message.content

class Subscriber (Ice.Application):
    def run (self, argv):
        self.shutdownOnInterrupt()
        # Proxy al TopicManager.
        prx = self.communicator().stringToProxy('Simple/TopicManager:tcp -p 10000')
        topicManagerPrx = IceStorm.TopicManagerPrx.checkedCast(prx)
        # Creación del objeto remoto.
        oa = self.communicator().createObjectAdapterWithEndpoints('SubscriberOA', 'default')
        processorPrx = oa.addWithUUID(ProcessorI())
        # Proxy al topic.
        try:
            topicPrx = topicManagerPrx.retrieve('MyTopic')
        except IceStorm.NoSuchTopic:
            topicPrx = topicManagerPrx.create('MyTopic')
        # Subscripción.
        topicPrx.subscribeAndGetPublisher(None, processorPrx)
        oa.activate()
        self.communicator().waitForShutdown()
        topicPrx.unsubscribe(processorPrx)
        return 0

Subscriber().main(sys.argv)

El código es prácticamente idéntico. Sin embargo, en este caso tenemos que crear un adaptador de objetos para albergar el sirviente y, posteriormente, registrar este último (encargado de encarnar al objeto remoto). La parte más importante es la relativa a la operación subscribeAndGetPublisher, la cual añade un proxy subscriptor al topic (processorPrx) sin tener en cuenta la calidad del servicio (None).

Ejecutando IceStorm

Llegados a este punto ya tenemos definidas estas sencillas versiones del publicador y el subscriptor. Ahora sólo queda ejecutar IceStorm y poner en funcionamiento el sistema. IceStorm es un servicio relativamente ligero que se implementa como un servicio IceBox. Por lo tanto, el primer paso consiste en arrancar IceStorm:

$ icebox --Ice.Config=config.icebox

El archivo de configuración básico (config.icebox) podría ser el siguiente:

IceBox.ServiceManager.Endpoints=tcp -p 9999
IceBox.Service.IceStorm=IceStormService,32:createIceStorm --Ice.Config=config.icestorm

Simplemente se informa a IceBox que se va a levantar una instancia del servicio IceStorm, con unas determinadas propiedades especificadas en el archivo de configuración asociado (config.icestorm):

IceStorm.InstanceName=Simple
IceStorm.TopicManager.Proxy=Simple/TopicManager:default -p 10000
IceStorm.TopicManager.Endpoints=tcp -p 10000
IceStorm.Publish.Endpoints=tcp
Freeze.DbEnv.IceStorm.DbHome=db

En este archivo de configuración se especifica el nombre de la instancia de IceStorm, la dirección de escucha de un proxy del tipo TopicManager y el directorio utilizado por IceStorm para gestionar su persistencia a través de Freeze (servicio de persistencia de Ice). Ya sólo queda ejecutar el subscriptor (podemos ejecutar tantos como queramos para comprobar que todos reciben las invocaciones remotas):

$ python subscriptor.py

Y el publicador:

$ python publisher.py

Federación de topics

Una de las funcionalidades de IceStorm más interesantes es la posibilidad de establecer una jerarquía entre los distintos topics, creando un grafo dirigido en el que se establece el concepto de coste. En otras palabras, podemos establecer una jerarquía de topics de manera que los mensajes fluirán por los distintos enlaces que conectan los topics en función del coste asociado al mensaje por una parte, y al enlace por otra.

Imaginemos un grafo en el que tenemos tres topics: A, B y C. A tiene un enlace con B de coste 1, mientras que A tiene un enlace con C de coste 2, es decir:

A ----- c(1) -----> B
A ----- c(2) -----> C

Por otra parte, supongamos un subscriptor Sb vinculado a B, un subscriptor Sc vinculado a C y un publicador P vinculado a A. Si P envía un mensaje de coste 1, este mensaje llegará a A y fluirá a B y C debido a que el coste del mensaje es igual o menor que el de los enlaces, llegando por tanto a los subscriptores Sb y Sc. Sin embargo, si P envía un mensaje de coste 2, éste sólo llegará a Sc ya que el enlace existente entre A y B tiene un coste 1, por lo que el subscriptor Sb quedaría sin notificación y, por otra parte, Sc sí recibiría el mensaje. Merece la pena destacar que un mensaje de coste 0 fluye por todos los enlaces.

Desplegar esta jerarquía de topics en IceStorm es muy sencillo con la herramienta de administración de IceStorm:

$ icestormadmin --Ice.Config=config.icestorm graph.txt

El archivo graph.txt representa dicha jerarquía y es muy simple:

create A B C
link A B 1
link A C 2

Con este sencillo archivo de texto creamos los topics y los enlaces entre ellos.

Ahora debemos adaptar el publicador y el subscriptor incluyendo esta nueva funcionalidad. El publicador sólo necesita vincularse al topic A y asociar el coste al mensaje. Para ello se utiliza el argumento implícito de contexto que toda operación de un proxy conlleva. En Python es muy simple:

....
# Proxy al topic.
        try:
            topicPrx = topicManagerPrx.retrieve('A')
        except IceStorm.NoSuchTopic:
            topicPrx = topicManagerPrx.create('A')
....
# Publicación.
        m = Simple.Message('content')
        ctx = {}
        ctx['cost'] = '2'
        processorPrx.process(m, ctx)
....

En el subscriptor podemos añadir la funcionalidad al sirviente para que imprima el coste:

class ProcessorI (Simple.Processor):
    def process (self, message, curr):
        print 'Message: ' + message.content
        print 'Cost: ' + curr.ctx.get('cost')

Y para probar nuestro ejemplo con los dos subscriptores, utilizaremos una simple comparación:

....
 # Proxy al topic.
        try:
            if argv[1] == 'B':
                topicPrx = topicManagerPrx.retrieve('B')
            else:
                topicPrx = topicManagerPrx.retrieve('C')
        except IceStorm.NoSuchTopic:
            pass
        # Subscripción.
....

Para probar este sencillo ejemplo basta con seguir los siguientes pasos:

$ icebox --Ice.Config=config.icebox
$ icestormadmin --Ice.Config=config.icestorm graph.txt
$ python subscriber.py B
$ python subscriber.py C
$ python publisher.py

En este ejemplo en concreto, el mensaje sólo llegará al subscriptor vinculado al topic C:

david@TheSecondWorldHeaven:~/Universidad/ZeroC_ICE/src/icestorm/topicFederation$ python subscriber.py C
Message: content
Cost: 2

Conclusiones

Con IceStorm podemos gestionar el envío y la recepción de eventos de una manera sencilla, flexible y eficiente. La conclusión más importante que podemos obtener es la ventaja que supone el hecho de poder centrarte en la aplicación en cuestión, minimizando el tiempo empleado en cuestiones ajenas a la naturaleza de la misma y obteniendo un sistema eficiente. En particular, la federación de topics ofrece características muy interesantes a la hora de crear aplicaciones distribuidas, permitiendo jugar con la distribución de eventos a nuestro antojo.

¿Y ahora qué?

Por supuesto, todo este despliegue puede ser más sencillo y más transparente si utilizamos IceGrid tanto para desplegar la instancia de IceStorm como para localizar los distintos objetos involucrados en la aplicación. Además, es posible crear un sistema totalmente dinámico que gestione la creación y destrucción de topics en función de la naturaleza de nuestra aplicación. Por otra parte, IceStorm permite añadir más funcionalidades descritas en profundidad en el manual de Ice. Todo eso queda para una futura receta.

Referencias

Comentarios

Opciones de visualización de comentarios

Seleccione la forma que prefiera para mostrar los comentarios y haga clic en «Guardar las opciones» para activar los cambios.

Simplificando el publisher

Hola KO_mOd_O, Muchas gracias por la receta.

No entiendo por qué tu publisher llama a waitForShutdown, ¿para qué?

Tampoco entiendo por qué usas un proxy oneway del topic. El Publish endpoint del IceStorm está configurado como tcp, puede ser twoway sin problemas...

Tienes razón... No hay

Tienes razón... No hay motivo para llamar a waitForShut (ni shutdownOnInterrupt) en el publisher. Además, la clase Ice::Application ya se encarga de la correcta finalización del communicator. En cuanto al proxy oneway, es más lógico obviar la naturaleza del proxy en un ejemplo más sencillo, obteniendo un código más simplificado aún. Para decidirse sobre oneway o twoway entrarían en juego cuestiones de eficiencia, fiabilidad, retrasos, etc. En la parte de IceStorm del manual detallan estos aspectos.