ZeroC IceStorm: gestionando eventos
En esta receta se describe qué es IceStorm (el servicio de publicación-subscripción de ICE) junto con algunos ejemplos que muestran su funcionalidad.
Antecedentes
En Empezar con ZeroC Ice en Debian se describió cómo adentrarse en ZeroC Ice. Ahora llega el turno de conocer IceStorm, uno de los servicios que nos ofrece esta herramienta multiusos. Nota: Para probar los ejemplos de esta receta se ha utilizado la versión 3.2.1-3 de Ice y la versión 2.4.4 de Python.Conceptos básicos
IceStorm es un servicio eficiente de publicación-subscripción para aplicaciones Ice. Desde un punto de vista funcional, IceStorm actúa como mediador entre el publicador y el subscriptor, proporcionando varias ventajas:- Sólo es necesaria una llamada al servicio IceStorm para distribuir la información a los subscriptores.
- Independencia entre el emisor y los receptores de información, permitiendo que el primero se ocupe de las responsabilidades relativas a la aplicación y no de tareas administrativas.
- Los cambios introducidos en el código son mínimos para incorporar la funcionalidad de IceStorm.
Un ejemplo sencillo
A continuación se mostrará un ejemplo sencillo de cómo utilizar IceStorm. Dicho ejemplo consiste en una serie de objetos cuyo propósito es procesar una determinada estructura de datos. Para ello, se define el siguiente archivo a través de Slice (simple.ice):module Simple { struct Message { string content; }; interface Processor { void process (Message m); }; };Básicamente, disponemos de una estructura que contiene una cadena y una interfaz con una sencilla operación de procesado. Teniendo en cuenta la funcionalidad básica de IceStorm, con esta sencilla aproximación podríamos diseñar un sistema que permitiera procesar esta estructura por distintos servidores empleando una única llamada a IceStorm y de forma transparente para el publicador.
Creando el publicador
Veamos ahora el código del publicador (publisher.py):#!/usr/bin/python # -*- coding: utf-8 -*- import sys, Ice, IceStorm Ice.loadSlice('simple.ice', ['-I', '/usr/share/slice']) import Simple class Publisher (Ice.Application): def run (self, argv): # Proxy al TopicManager. prx = self.communicator().stringToProxy('Simple/TopicManager:tcp -p 10000') topicManagerPrx = IceStorm.TopicManagerPrx.checkedCast(prx) # Proxy al topic. try: topicPrx = topicManagerPrx.retrieve('MyTopic') except IceStorm.NoSuchTopic: topicPrx = topicManagerPrx.create('MyTopic') # Proxy al objeto que publica. pub = topicPrx.getPublisher() processorPrx = Simple.ProcessorPrx.uncheckedCast(pub) # Publicación. m = Simple.Message('content') processorPrx.process(m) return 0 Publisher().main(sys.argv)Como se puede apreciar, el código habla por sí mismo (incluso sin fijarnos en los comentarios). Ésta es una de las principales ventajas de utilizar Ice: hace que el crear aplicaciones distribuidas implique muy pocos cambios en el código de la aplicación. A nivel general, se llevan a cabo los siguientes pasos:
- Se crea un proxy a un objeto de tipo TopicManager.
- Se recupera un proxy al topic en cuestión.
- Se obtiene un proxy al publicador.
- Se invoca a la operación remota process.
Creando el subscriptor
Sencillo, ¿verdad? Pues el código del subscriptor (subscriptor.py) se mantiene en esa misma línea:#!/usr/bin/python # -*- coding: utf-8 -*- import sys, Ice, IceStorm Ice.loadSlice('simple.ice', ['-I', '/usr/share/slice']) import Simple class ProcessorI (Simple.Processor): def process (self, message, curr): print message.content class Subscriber (Ice.Application): def run (self, argv): self.shutdownOnInterrupt() # Proxy al TopicManager. prx = self.communicator().stringToProxy('Simple/TopicManager:tcp -p 10000') topicManagerPrx = IceStorm.TopicManagerPrx.checkedCast(prx) # Creación del objeto remoto. oa = self.communicator().createObjectAdapterWithEndpoints('SubscriberOA', 'default') processorPrx = oa.addWithUUID(ProcessorI()) # Proxy al topic. try: topicPrx = topicManagerPrx.retrieve('MyTopic') except IceStorm.NoSuchTopic: topicPrx = topicManagerPrx.create('MyTopic') # Subscripción. topicPrx.subscribeAndGetPublisher(None, processorPrx) oa.activate() self.communicator().waitForShutdown() topicPrx.unsubscribe(processorPrx) return 0 Subscriber().main(sys.argv)El código es prácticamente idéntico. Sin embargo, en este caso tenemos que crear un adaptador de objetos para albergar el sirviente y, posteriormente, registrar este último (encargado de encarnar al objeto remoto). La parte más importante es la relativa a la operación subscribeAndGetPublisher, la cual añade un proxy subscriptor al topic (processorPrx) sin tener en cuenta la calidad del servicio (None).
Ejecutando IceStorm
Llegados a este punto ya tenemos definidas estas sencillas versiones del publicador y el subscriptor. Ahora sólo queda ejecutar IceStorm y poner en funcionamiento el sistema. IceStorm es un servicio relativamente ligero que se implementa como un servicio IceBox. Por lo tanto, el primer paso consiste en arrancar IceStorm:IceBox.ServiceManager.Endpoints=tcp -p 9999 IceBox.Service.IceStorm=IceStormService,32:createIceStorm --Ice.Config=config.icestormSimplemente se informa a IceBox que se va a levantar una instancia del servicio IceStorm, con unas determinadas propiedades especificadas en el archivo de configuración asociado (config.icestorm):
IceStorm.InstanceName=Simple IceStorm.TopicManager.Proxy=Simple/TopicManager:default -p 10000 IceStorm.TopicManager.Endpoints=tcp -p 10000 IceStorm.Publish.Endpoints=tcp Freeze.DbEnv.IceStorm.DbHome=dbEn este archivo de configuración se especifica el nombre de la instancia de IceStorm, la dirección de escucha de un proxy del tipo TopicManager y el directorio utilizado por IceStorm para gestionar su persistencia a través de Freeze (servicio de persistencia de Ice). Ya sólo queda ejecutar el subscriptor (podemos ejecutar tantos como queramos para comprobar que todos reciben las invocaciones remotas):
Federación de topics
Una de las funcionalidades de IceStorm más interesantes es la posibilidad de establecer una jerarquía entre los distintos topics, creando un grafo dirigido en el que se establece el concepto de coste. En otras palabras, podemos establecer una jerarquía de topics de manera que los mensajes fluirán por los distintos enlaces que conectan los topics en función del coste asociado al mensaje por una parte, y al enlace por otra. Imaginemos un grafo en el que tenemos tres topics: A, B y C. A tiene un enlace con B de coste 1, mientras que A tiene un enlace con C de coste 2, es decir: A ----- c(1) -----> B A ----- c(2) -----> C Por otra parte, supongamos un subscriptor Sb vinculado a B, un subscriptor Sc vinculado a C y un publicador P vinculado a A. Si P envía un mensaje de coste 1, este mensaje llegará a A y fluirá a B y C debido a que el coste del mensaje es igual o menor que el de los enlaces, llegando por tanto a los subscriptores Sb y Sc. Sin embargo, si P envía un mensaje de coste 2, éste sólo llegará a Sc ya que el enlace existente entre A y B tiene un coste 1, por lo que el subscriptor Sb quedaría sin notificación y, por otra parte, Sc sí recibiría el mensaje. Merece la pena destacar que un mensaje de coste 0 fluye por todos los enlaces. Desplegar esta jerarquía de topics en IceStorm es muy sencillo con la herramienta de administración de IceStorm:create A B C link A B 1 link A C 2Con este sencillo archivo de texto creamos los topics y los enlaces entre ellos. Ahora debemos adaptar el publicador y el subscriptor incluyendo esta nueva funcionalidad. El publicador sólo necesita vincularse al topic A y asociar el coste al mensaje. Para ello se utiliza el argumento implícito de contexto que toda operación de un proxy conlleva. En Python es muy simple:
.... # Proxy al topic. try: topicPrx = topicManagerPrx.retrieve('A') except IceStorm.NoSuchTopic: topicPrx = topicManagerPrx.create('A') .... # Publicación. m = Simple.Message('content') ctx = {} ctx['cost'] = '2' processorPrx.process(m, ctx) ....En el subscriptor podemos añadir la funcionalidad al sirviente para que imprima el coste:
class ProcessorI (Simple.Processor): def process (self, message, curr): print 'Message: ' + message.content print 'Cost: ' + curr.ctx.get('cost')Y para probar nuestro ejemplo con los dos subscriptores, utilizaremos una simple comparación:
.... # Proxy al topic. try: if argv[1] == 'B': topicPrx = topicManagerPrx.retrieve('B') else: topicPrx = topicManagerPrx.retrieve('C') except IceStorm.NoSuchTopic: pass # Subscripción. ....Para probar este sencillo ejemplo basta con seguir los siguientes pasos:
david@TheSecondWorldHeaven:~/Universidad/ZeroC_ICE/src/icestorm/topicFederation$ python subscriber.py C Message: content Cost: 2
Conclusiones
Con IceStorm podemos gestionar el envío y la recepción de eventos de una manera sencilla, flexible y eficiente. La conclusión más importante que podemos obtener es la ventaja que supone el hecho de poder centrarte en la aplicación en cuestión, minimizando el tiempo empleado en cuestiones ajenas a la naturaleza de la misma y obteniendo un sistema eficiente. En particular, la federación de topics ofrece características muy interesantes a la hora de crear aplicaciones distribuidas, permitiendo jugar con la distribución de eventos a nuestro antojo.¿Y ahora qué?
Por supuesto, todo este despliegue puede ser más sencillo y más transparente si utilizamos IceGrid tanto para desplegar la instancia de IceStorm como para localizar los distintos objetos involucrados en la aplicación. Además, es posible crear un sistema totalmente dinámico que gestione la creación y destrucción de topics en función de la naturaleza de nuestra aplicación. Por otra parte, IceStorm permite añadir más funcionalidades descritas en profundidad en el manual de Ice. Todo eso queda para una futura receta.Referencias
- Manual online de ZeroC Ice.
- Documentación online de IceStorm.
- Documentación de referencia online de Slice.
[ show comments ]
blog comments powered by Disqus