Programación Distribuida y su Aplicación Bajo Internet
Marzo 2004
Autor: Juansa SendraCada vez más aplicaciones requieren simultáneamente alta disponibilidad y distribución geográfica. El diseño e implementación de sistemas distribuidos tolerantes a fallos es una tarea compleja. Si además se trata de sistemas de gran tamaño (ej en telecomunicaciones) se requieren herramientas adecuadas.
Ericsson apostó por la segunda opción (hard. normal + soporte soft), pero diseñó un lenguaje específico para la construcción de aplicaciones distribuidas tolerantes a fallos con concurrencia masiva. Dicho lenguaje constituye una plataforma de distribución de alto nivel, y se denomina Erlang.
Los sistemas de telecomunicaciones distribuidos son muy complejos. Normalmente se implementaban utilizando alguna de las siguientes alternativas:
La justificación es la necesidad de mantener un elevado nivel de prestaciones
En cambio, un lenguaje simbolico:
Erlang es un lenguaje simbólico (concretamente funcional), diseñado para facilitar el desarrollo y mantenimiento de aplicaciones complejas.
Las características básicas de Erlang son:
{msg,Tipo,Texto}={msg,cambioModo,"modo edicion"}se realiza 'pattern matching' (indicado por =).
{..,..,..} representa una tupla (colección ordenada de datos), con valores constantes (ej.- tiras de caracteres entre "", valores numéricos, o símbolos -identificadores en minusculas-) o con variables (los identificadores con la inicial mayúscula corresponden a variables).
La unificación contrasta ambas tuplas e intenta comprobar si concuerdan (el número de elementos, y los elementos con valores constantes coinciden); si concuerdan, asigna a cada variable el valor correspondiente
-module(ej).
-export([fac/1]). %fac requiere un parametro
fac(0)->1;
fac(N)-> N*fac(N-1).[1,2,3] [A|Resto] []
[x*x || x<-[1,2,3,4]]
{2,color,X} -record (persona, {nombre, sexo, edad}).
#persona{nombre="Pepe", sexo=varon, edad=23}.X Emisor Msg
Una funcion
len([]) -> 0;
len([H|T]) -> 1+len(T).
fun (vars) = expr end
case F of
{circulo, X, Y, R} -> dibujaCirculo(R);
{rectangulo, X, Y, W, H} -> dibujaRect(X,Y,W,H);
{linea, X1, Y1, X2, Y2} -> dibujaLinea(X1,Y1,X2,Y2)
end
fac(0)->1;
fac(N)->N*fac(N-1).
fib(0)->1;
fib(1)->1;
fib(N)->fib(N-1)+fib(N-2).
igual([],[])->true;
igual([],[A|_])->false;
igual([A|_],[])->false;
igual([A|Ra],[B,Rb])->
if atom(A)->
if not atom(B)-> false
if A==B -> igual(Ra,Rb) else false;
elsif atom(B)-> false
else % A y B son listas
igual(A,B) and igual(Ra,Rb).(ej.- inv([1,2,3]) es igual a inv([2,3])++1, que a su vez es inv([3])++2++1, que genera inv([])++3++2++1, que resulta en []++3++2++1, que es [3,2,1])
inv([])->[];
inv([A|R])->inv(R)++A.
%fib0 es una funcion auxiliar
fib0(0,A,B)->A; % en A el valor de fib(N-1), en B el de fib(N-2)
fib0(N,A,B)->Fib0(N-1,A+B,A).
fib(N)->fib0(N,1,0). % esta es la función que exportamosNOTA.- usamos las BIF spawn, register, whereis, self, node y las primitivas !, receive
Pid=spawn(Nodo,Modulo,Funcion,Args)
Pid=spawn(Modulo,Funcion,Args)
Modulo:funcion(Args)
Pid ! Mensajedonde Mensaje puede ser cualquier término
receive
Patron1 [when Guarda1] -> Accion1
...
PatronN [when GuardaN] ->AccionN
[after Time -> AccionT]
endNOTA.- usamos las BIF exit, link, process_flag, monitor_node y la primitiva catch
fenix(Mod,Fun,Args) -> %mantiene siempre vivo un proceso
process_flag(trap_exit, true),
link(spawn(Mod, Fun, Args)),
receive {'EXIT', Pid, Razon} -> fenix(Mod,Fun,Args) end.NOTA.- usamos la BIF apply
apply(Mod,Fun,Args)
apply(Fun,Args)permiten ejecutar Mod:Fun(Args), aunque Mod y Fun se calculen durantela ejecución.
tc(Mod,Func,args) devuelve {duracion,result}
-module(rpc).
start() -> register(rpc, spawn(rpc,loop,[])).
loop() ->
receive {Cliente, {apply, M, F, Args}} ->
%proceso para servir esta peticion
spawn(rpc, reply, [Cliente, M, F, Args]),
loop() %queda esperando nuevas peticiones
end.
reply(Cliente,Mod,Fun,Args) ->
Cliente ! {rpc, node(), catch apply(Mod,Fun,Args)}.donde asumimos que todos los nodos evaluan rpc:start() cuando arrancan. De forma similar podemos definir una versión multi-nodo de RPC, de forma que multi:call(Nodos,M,F,Args) evalua la función M:F(args) en todos los nodos indicados en el primer param. (devuelve una tupla {Respuestas,Errores}, donde Repuestas es una lista de respuestas, y Errores una lista de nodos que han fallado)
-modulo(multi).
-export([call/4]).
call(Nodos, Mod, Func, Args) ->
send(Nodos, Mod, Func, Args),
collect(Nodos, [], []).
send([], _, _, _) -> ok
send([N|Resto], Mod, Func, Args) ->
monitor_node(N,true),
{rpc,N}!{self(), {apply, Mod, Func, Args}},
send(Resto,Mod,Func,Args).
collect([], Respuestas, Mal) -> {Respuestas,Mal};
collect([N|Resto], R, M) ->
receive
{rpc, Nodo, Resul} ->
monitor_node(Nodo,false),
collect(Resto, [Resp|R], M);
{nodedown, Nodo} -> collect(Resto, R, [Nodo|M])
end.Esta función permite invocar cualquier función en cualquier nodo (la función no sabe que se está invocando vía RPC). Ej asumiendo una función hw:poll(Disp) para consultar el estado de un determinado dispositivo podemos invocar multi:call(Nodos, hw, poll, [Disp]), donde Nodos puede ser cualquier lista de nodos (y por ejemplo incluir el nodo local). El RPC tradicional es el caso trivial en el que la lista de nodos es un singleton
Diseño tradicional de sist. distribuidos -> básicamente top-down, centrándose en los interfaces entre los distintos procesos (los interfaces deben especificarse al principio). Se usa un compilador que traduce las especificaciones de interfaz en las rutinas que permiten codificar/decodificar los mensajes de red (marshalling). -> interfaces estáticos, no pueden cambiarse en ejecución (hay que recompilar)
Pero la función multi:call permite invocar cualquier función en cualquier nodo -> la invoación de cualqueir función es independiente de si se invocará de forma local o remota.
Tipos de interfaces de un sistema Erlang típico:
a) funciones.- un módulo exporta un cto de funciones
b) basados en mensajes.- muchos procesos y servidores (ej rpc) sólo exportan un interfaz de mensajesNinguno de estos interfaces requiere una preparación especial para usarlos en un entorno distribuido. Un servidor puede recibir cualquier mensaje independientemente de la ubicación del emisor, y las funciones pueden invocarse local o remotamente.
Ej.- suponemos que pmap(Funcion,Coleccion) aplica la funcion a cada elemento de la colección (ej. lista o arbol), devolviendo una nueva colección
pmap(F, {lista, Datos}) -> {lista, lists:map(F,Datos)};
pmap(F, {arbol, Datos}) -> {arbol, arbol:map(F,Datos)};
pmap(_, {Otra, _}) -> {error, Otra}.Con ella podemos marshall en ejecución cualquier tipo de lista o arbol
Todo sist. distribuido heterogéneo debe abordar el problema de marshalling y unmarshalling estructuras de datos de forma eficiente, independiente del lenguaje y la plataforma hard.
Los lenguaje simbólicos suelen asociar tipo a los valores, no a las variables (ej para que el garbage-collector actúe según la marca de tipo asociada). Estos lenguajes permiten realizar marshalling eficiente durante la ejecución sin la ayuda de un lenguaje de descripción de tipos (permite definir un formato genérico para la representación externa, en el que podemos guardar/recuperar cualquier término.
En Erlang, y para cualquier término T, existe un mapping f tal que
f(T) = X (representación externa), e (inversa f)(X) = TCuando un proceso envía un mensaje, y el soporte en ejecución descubre que el destinatario reside en otro nodo, aplica f sobre el mensaje para empaquetarlo antes de remitirlo. El soporte en ejecución del nodo receptor aplica la inversa de f para reconstruir el mensaje (f y su inversa están implementadas mediante código C optimizado no recursivo)
Codificación de Pids.- un Pid siempre incorpora información que identifica el nodo donde se originó -> los convierte en 'location transparent' -> el programador puede usar un Pid sin saber si corrensponde a un nodo local o remoto.
Métodos para representación externa de los datos:- ignorar el problema (enviar el contenido de la memoria). Eficiente, reduce drásticamente interioperabilidad- ascii. Portable, fácil de depurar y comprender. Muy ineficiente- uso de un lenguaje de descripción de interfaces y un compilador de stubs.Es la solución tradicional (ej CORBA). Dede decidirse en compilación qué estructuras enviar por la red. Dificulta el uso de estructuras dinámicas (ej.- árboles o listas hash). Pasos
a) define todos los interfaces en un lenguaje especifico
b) ejecuta el compilador RPC para generar el código stub
c) enlaza el programa cliente, stubs cliente, y soporte en ejecución
en un módulo cargable por el cliente
d) enlaza el programa servidor, servidor de stubs y soporte de ejecución
en un módulo cargable por el servidor- El intérprete codifica/decodifica los datos en ejecución según su tipo. Puede codificarse dinámicamente cualquier estructura de datos. No requiere lenguaje de descripción de interfaces. A priori mucho más ineficiente que la aproximación tradicional, pero los resultados empíricos demuestran lo contrario
Asumimos gran cantidad de datos replicados en RAM. Ventaja de usar un lenguaje con GC para implementar sistemas non-stop -> el soporte en ejecución evita que nos quedemos sin memoria, y el GC puede mantener la fragmentación de memoria en límites razonables.
El gestor de memoria ideal debe cubrir (muy dificil):- GC rápido- paradas para GC por debajo de un umbral- evita fragmentación externa e interna- buen uso de la memoria- Muchos datos replicados son muy estables (lectura frecuente, gran tamaño, actualización infrecuente). GC debe gestionarlos bien
Erlang consigue esas caracteristicas combinando memoria normal y diccionarios residentes en memoria (implementados como listas hash lineales, tratadas por el GC de forma especial).
Cuando llega un estimulo externo se localiza de forma eficiente el objeto de datos asociado. El diccionario de actualiza de forma destructiva (frente a la semántica write-once del resto de Erlang). Otros lenguajes funcionales intentan resolver el mismo problema de forma más 'pura' (monads en Haskell y tipos unicos en Clean).
El uso de diccionarios simplifica determinados tipos de programas. Ej. contar la frecuencia de aparición de cada palabra en un texto grande es muy complejo con un esquema declarativo. Pero con diccionarios:
wcount(F) -> wcount(file:open(F,read), dict:new()).
wcount(Fd, Dict) ->
case get_word(Fd) of
{ok, Word} ->
Old = dict:lookup(Dict,Word),
dict:insert(Dict, {Word,add(Old)}),
wcount(Fd,Dict);
eof -> file:close(Fd), Dict
end.
add(undefined) -> 1;
add({Word,Count}) -> Count+1.Diccionario = base para gestión de datos eficiente en un sist distribuido
Son el mecanismo que permite gestionar datos replicados de forma coherente y mantenible. Erlang ideal para implementar gestores de transacciones (TM)
Muchos proyectos desarrollan TMs especificos (con conocimiento semántico sobre la aplicación). Las aplicaciones típicas en telecomunicaciones poseen restricciones de tiempo real (normalmente soft), y no pueden basarse en el mecanismo habitual de reservas (en ocasiones importa más una respuesta a tiempo que la consistencia entre las réplicas de un objeto).
Implementación eficiente de estructuras de datos persistentes. La escritura en disco plantea problemas similares a la escritura de datos en red, y la discusión de soluciones (interfaces estáticos frente a interfaces dinámicos) similar.
Ej.- un elemento habitual es un log
-module(log).
start() -> register(log, spawn(log,go,[])).
go() ->
Fd = file:open("LOG", read_write),
file:position(Fd,eof),
loop(Fd).
loop(F) ->
receive {From, {log,Term}} ->
B=term_to_binary(Term),
file:write(F,B),
From ! {log,logged},
loop(F)
end. log(Term) ->
log ! {self(), {log, Term}},
receive {log, logged} -> logged end.existe la BIF inversa binary_to_term
Todo ello sirve de base para la implementación de sistemas tolerantes a fallos en Erlang
NOTA.- usa BIF size
-module(log).
-author(klacke).
-export([start/0, start/1, stop/0, log/1, upread/1, truncate/0]).
% start/stop para arrancar y parar el logger
% truncate trunca el log, log añade un termino al final del log
% upread(F) aplica F sobre cada uno de los elems del log
-define(LOGFILE, 'slog.log'). start() -> start(?LOGFILE).
start(F) ->
case whereis(?MODULE) of
undefined -> register(?MODULE, spawn(?MODULE, loop0, [F]));
Pid -> true
end.
stop() -> req(stop).
req(R) ->
?MODULE ! {self(), R},
receive {?MODULE, Reply} -> Reply end. loop0(NomFitxer) ->
case file:open(NomFitxer,[read, write, raw, binary]) of
{ok, Fd} ->
{ok,Eof} = file:position(Fd, eof),
file:position(Fd, bof),
FilePos = position_fd(Fd,0),
maybe_warn(FilePos,Eof),
loop(Fd);
{error, Reason} -> exit(Reason)
end.
maybe_warn(Eof, Eof) -> ok;
maybe_warn(FilePos, Eof) -> warn("~w bytes truncated \n",[Eof-FilePos]). %entero a lista de 4 bytes y viceversa
i32(B) when binary(B) -> i32(binary_to_list(B,1,4));
i32([A,B,C,D]) -> (A bsl 24) bor (B bsl 16) bor (C bsl 8) bor D;
i32(I) when integer(I) ->
[(I bsr 24) band 255, (I bsr 16) band 255, (I bsr 8) band 255, I band 255]. %lee entero desde fichero
getInt32(F) -> {ok,B} file:read(F,4), i32(B). position_fd(Fd, LasPos) -> %devuelve ultima posicion ok
case catch getInt32(Fd) of %entero de 4 bytes como longitud
Int when integer(Int) -> %ultima pos ok
case file:read(Fd,Int) of
{ok,B} when
_ -> file:position(Fd,LastPos), file:truncate(Fd), LastPos
end. loop(Fd) ->
receive
{From, {log,Bin}} -> From!{?MODULE, log_binary(Fd,Bin)};
{From, {upread, Fun}} -> From!{?MODULE, upread(fd,Fun)};
{From, truncate} ->
file:position(Fd,bof), file:truncate(Fd), From!{?MODULE,ok};
{From, stop} ->
file:close(Fd), From!{?MODULE,stopped}, exit(normal)
end,
loop(Fd).
log_binary(Fd,Bin) ->
Sz=size(Bin),
case file:write(Fd, [i32(Sz),Bin] of %escribe [[A,B,C,D],Bin] en fichero
ok->ok;
{error,Reason} ->
warn("Can't write logfile ~p ",[Reason]), {error,Reason}
end. upread(Fd, Fun) ->
{ok,curr} = file:position(Fd, cur),
file:position(Fd,bof),
upread(Fd, catch get_term(Fd), Fun). upread(Fd, {'EXIT', _}, Fun) -> ok;
upread(Fd, Term, Fun) ->
Fun(Term), upread(Fd, catch get_term(Fd), Fun). get_term(Fd) -> {ok,B}=file:read(Fd,getin32(Fd)), binary_to_term(B). %funciones para que el cliente acceda al servidor
upread(Fun) -> req({upread,Fun}).
truncate() -> req(truncate).
log(Term) -> req({log, term_to_binary(Term)}).El esquema de funcionamiento es similar al de Java. El compilador traduce las instrucciones de alto nivel a un código interno que se ejecuta en un evaluador Erlang (concepto similar a la máquina virtual Java). El evaluador (también denominado emulador) está disponible para distintas plataformas, por lo que tenemos las mismas caracteristicas de portabilidad que con Java.
El soporte en ejecución (ERTS Erlang Runtime System) consiste en el evaluador y un conjunto de bibliotecas. ETS, junto a un conjunto de utilidades y componentes, y un conjunto de principios de diseño, constituyen la plataforma OTP (Open Telecom Platform), tambien llamada Erlang/OTP.
Una de las utilidades es el 'Erlang shell', un intérprete de órdenes similar al de los sistemas operativos, pero capaz de:
Además existen otras herramientas gráficas (Debugger, Pman (trazas), TV, etc)
En Windows el shell se invoca como 'erl'. En línea de comandos podemos introducir cualquier expresión Erlang (ej.- 1 + 2.), invocar cualquier BIF (ej date().), invocar cualquier función (ej.- math:factorial(40).), asociar/destruir variables, manipular tareas concurrentes, o introducir comandos
cd(directorio). %cambia al directorio indicado
c(modulo). %compila el módulo indicado
halt(). %finaliza la ejecución del shell.
%Control-c y Control-g también permite salir
help(). %muestra los comandosEn un sistema Erlang distribuido, 'nodo' significa un entorno de ejecución Erlang que puede comunicar con otros entornos similares (potencialmente en máquinas distintas)
Cada nombre tiene un nombre asignado (se puede obtener con la BIF node()). El nombre es un átomo (consiste en identificador@dominio), y se garantiza que es globalmente único (ej.- willy@gofre.dsic.upv.es). Desde dentro del dominio podemos usar solo el identificador.
Los nodos Erlang están protegidos mediante un sistema denominado 'magic cookie system':
Los cookies se asignan a nodos como sigue:
La biblioteca Erlang/OTP permite arrancar o controlar nodos esclavos (sólo bajo Unix, ya que se arrancan con el comando Unix 'rsh'). Para utilizar la orden, el usuario debe tener permiso para acceder al host remoto con rsh sin que se le pida una clave de acceso.
El sistema Erlang/OTP estandard puede configurarse para cambiar el comportamiento durante el arranque
Durante el arranque de Erlang/OTP, el sistema busca un fichero denominado '.erlang' en el directorio desde el que se lanza la aplicación (si no se encuentra, busca también en el directorio base del usuario)
Se asume que '.erlang' contiene expresiones Erlang válidas. Dichas expresiones se evalúan como si se hubiesen introducido desde consola. Por ejemplo, normalmentre se definen paths de búsqueda:
io:format("executing user profile in HOME/.erlang\n",[]).
code:add_path("/home/calvin/test/ebin").
code:add_path("/home/hobbes/bigappl-1.2/ebin").
io:format(".erlang rc finished\n",[]).Un proceso maestro monitoriza el comportamiento de la aplicación:
Servidor que actúa como Home Location Register (HLR) (un subconjunto más simple que denominamos Very Simple HLR (VSHLR).
Diseñamos tres módulos, todos con las funciones:
start() % arranca el servidor
stop() % para el servidor
iAmAt(Persona, Posicion) % indica al servidor 'La persona "Persona" esta en "Posicion"Tenemos un servidor central y un número arbitrario de clientes. Un cliente inicia una interacción (query) y el servidor responda (respuesta). Normalmente se usa el modelo cliente/servidor cuando es necesario arbitrar el acceso a recursos. Distintos clientes comparten un recurso, y el servidor gestiona dicho recurso.
Ignorando de momento cómo arrancar y parar el servidor, e ignorando posibles errores, podemos describir el servidor como una función f. El estado interno del servidor corresponde a la variable S, y el servidor recibe una petición Q; el servidor devuelve la respuesta R y cambia su estado interno a S'
(R,S') = f(Q,S)Vemos el código:
-module (servidor). %servidor generico
-export ([start/3, stop/1, loop/3, call/2]).
start(Nom, F, State) -> register(Nom, spawn(server,loop,[Name,F,State])).
stop(Nom) -> exit(Whereis(Nom),kill).
call(Nom,Query) -> Nom!{self(),Query},
receive {Nom,Reply} -> Reply end.
loop(Nom,F,State) ->
receive {Pid, Query} ->
{Resp, State1} = F(Query,State),
Pid ! {Nom, Resp},
loop(Nom, F, State1)
end. -module (vshlr1). %servidor vshlr, primera version
-export ([start/0, stop/0, iAmAt/2, find/1, handleEvent/2]).
start() -> server:start(xx1,
fun(Event,State)-> handle_event(Event,State) end,
[]).
stop(Nom) -> server:stop(xx1).
iAmPerson(Person,Position)->server:call(xx1,{iAmAt,Person,Position}).
find(Person) -> server:call(xx1,{find,Person}).
handleEvent({iAmAt,Person,Position},State) ->
Sate1 = updatePosition(Person,Position,State), {ok,State1};
handleEvent({find,Person},State) ->
Location = lookup(Person,State), {Location,State}. update_position(Key, Value, [{Key, _}|T]) -> [{Key, Value}|T];
update_position(Key, Value, [H|T]) -> [H|update_position(Key, Value, T)];
update_position(Key, Value, []) -> [{Key,Value}].
lookup(Key, [{Key, Value}|_]) -> {at, Value};
lookup(Key, [_|T]) -> lookup(Key, T);lookup(Key, []) -> lost.