Después de presentar ElasticSearch, es el turno de Logstash, un recolector de datos distribuidos. Logstash fue diseñado para recopilar datos provenientes de ficheros de logs (p. ej. logs de servidores Apache, Tomcat, etc.) con el fin de pre-procesarlos y enviarlos a un sistema de almacenamiento, como es el caso de ElasticSearch. Sin embargo, la herramienta ha evolucionado mucho y ya existen multitud de plugins que nos permiten obtener datos desde fuentes de datos de lo más variado (p. ej. Twitter, ficheros, bases de datos, servicios,...), pre-procesar dichos datos aplicando filtros y redirigir los datos (p. ej. hacia repositorios de datos, sistemas de visualización, sistemas de colas, etc.)


En esta entrada veremos un ejemplo de ingesta de datos desde un fichero CSV hacia Elsticsearch mediante Logstash. Para ello, utilizaremos un fichero con valores de cotización en bolsa de las compañías Yahoo y Google durante el año 2015 que puedes descargar aquí. Si te apetece obtener datos de alguna otra compañía, puedes obtener un fichero CSV similar en este enlace.

Para poder seguir esta entrada, necesitarás las siguientes herramientas:

Flujo de trabajo con Logstash

El flujo de trabajo con Logstash se basa en la definición de entradas, filtros y salidas de datos. En las entradas (input), se define la fuente de datos desde donde obtener los datos. En los filtros (filter) se definen operaciones y transformaciones sobre los datos, y en las salidas (output) se define el destino de dichos datos. Este flujo de datos se especifica en un fichero de configuración que tiene el siguiente aspecto.


Ejemplo con Logstash

Para realizar el ejemplo, tendremos que habilitar la plataforma Elasticsearch. Después, tendremos que descargar y descomprimir Logstash. A continuación, copiaremos en la carpeta bin del directorio de Logstash el fichero CSV que utilizaremos como fuente de datos de entrada (input), que puedes descargar aquí.

Ahora tendremos que crear el fichero de configuración que utilizaremos para leer los valores de las acciones, pre-procesarlos y enviarlos a Elasticsearch para su indexación.

En el bloque correspondiente al input hemos especificado las siguientes propiedades:
  • file: para configurar una entrada de datos proveniente de un fichero.
    • path: ruta donde se encuentra el fichero.
    • start_position: posición del fichero desde la que Logstash empezará a parsear datos.
    • type: tipo de documento que se creará en Elasticsearch cuando se indexen los datos.
    • sincedb_path: utilizado para que Logstash no tenga en cuenta dónde se quedó el cursor al parsear el fichero. Esta configuración viene bien para realizar pruebas, ya que de esta forma siempre podremos volver a indexar los datos del fichero. 
    • mutate: utilizado para mapear tipos de datos.
En el bloque correspondiente a los filtros hemos configurado lo siguiente:
  • csv: para mapear las columnas del fichero CSV a propiedades del documento JSON con unos nombres determinados.
  • date: para especificar la fecha proveniente de los datos de cotización de las acciones. Para ello, especificamos el patrón que siguen las fechas en el fichero CSV mediante la propiedad match, especificamos el locale y finalmente especificamos con target la propiedad del documento JSON de salida a la que se mapeará la fecha. Si no se configura un filtro de tipo date, Elasticsearch indexará cada documento con el timestamp en el que se recibió dicho documento.
Finalmente especificamos el output o salida de los datos mediante las siguientes configuraciones:
  • elasticsearch: donde especificamos el host donde está Elasticsearch y el índice que creará cuando se indexen los datos. Este índice se ha creado utilizando un patrón, de manera que se crea un índice por cada año perteneciente a los datos de cotización en bolsa. De esta forma se podrán borrar los documentos asociados a cada año borrando para ello su índice correspondiente.
  • stdout: mostrará en consola de comandos los datos parseados y enviados a Elasticsearch con el fin de comprobar que el proceso ha ido bien.


Una vez creado el fichero, deberemos situarlo en la carpeta bin de nuestro directorio de Logstash. Para empezar el proceso, tendremos que ejecutar Logstash, abriendo una consola de comandos en el directorio bin y escribiendo el siguiente comando, donde especificamos qué fichero de configuración queremos que Logstash utilice.

Cuando termine el proceso, podremos acceder al manager de Elasticsearch (http://localhost:9200/_plugin/kopf) para comprobar que los índices se han creado y que existen datos indexados. Concretamente, veremos que se han indexado 511 documentos nuevos en el índice logstash-bolsa-2015.


Conclusiones

Logstash es muy sencillo de configurar y dispone de multitud de plugins para realizar procesos de gestión de datos, lo que le convierte en una herramienta muy flexible. En la siguiente entrada utilizaremos Kibana para crear visualizaciones sobre los datos que hemos indexado con Elasticsearch.

Estamos en la era de los datos o el denominado Big Data. Disponemos de tal cantidad de datos, que los tradicionales sistemas de almacenamiento, gestión y procesamiento de los mismos ya no son válidos. Es por ello que en los últimos años han aparecido multitud de herramientas y frameworks para gestionar grandes cantidades de datos de forma eficiente. Una de estas herramientas es Apache Spark.



En esta entrada hablaré de este framework de procesamiento y análisis distribuido de datos, mostrando una introducción básica al mismo. En siguientes entradas mostraré más funcionalidades con ejemplos más complejos.

Para seguir esta entrada se necesitan las siguientes herramientas.

¿Qué es Apache Spark?

Spark es un framework de procesamiento y análisis distribuido de grandes cantidades de datos en memoria. Dispone de diversos módulos que permiten aplicar operaciones sobre datos, realizar consultas, procesamiento de grafos, stream processing y aplicación de algoritmos de machine learning de una forma eficiente y escalable. Al ser un entorno de computación en memoria, Spark puede conectarse a repositorios de datos externos como HBase, Cassandra, MongoDB, etc. donde residen datos persistentes.

Spark está escrito en Scala, un lenguaje de programación funcional soportado por la máquina virtual Java (JVM). De esta forma, el código Scala se compila y se convierte en bytecode Java, por lo que un programa escrito en Java es perfectamente interoperable con otro escrito en Scala.

El framework Spark dispone de una API para poder implementar aplicaciones de procesamiento y análisis de datos. Esta API se encuentra soportada por lenguajes de programación como Java o Python, aunque lo más natural es utilizar Scala, ya que es el lenguaje de programación que sustenta Spark.

Arquitectura de Spark

Los principales componentes de Spark son los que se describen a continuación:
  • Workers: son los nodos donde se ejecutan de forma distribuida las operaciones o tareas de las aplicaciones Spark.
  • Cluster manager: gestiona los workers. Spark dispone de su propio manager, aunque también soporta los gestores Mesos y YARN.
  • Driver program: es la aplicación principal. Las diferentes tareas que se implementen en esta aplicación serán las que se distribuyan por los workers del clúster para la realización en paralelo de las diferentes operaciones requeridas.

API de Spark

Antes de entrar en materia y poner un sencillo ejemplo de procesamiento de datos con Spark, es conveniente entender algunos conceptos básicos de la API.
  • SparkContext: facilita la conexión de la aplicación con un clúster Spark.
  • Resilient Distributed Dataset (RDD): un RDD es una colección de elementos que puede particionarse para ser tratados en paralelo. La API proporciona métodos para conectarse a diferentes repositorios de datos (Hadoop, Cassandra, etc.) y crear RDDs. La creación de un RDD es una operación "lazy", es decir, que Spark no realiza ninguna acción al momento, sino que realiza una programación de dicha operación hasta que ocurra alguna acción sobre los datos del RDD. Las operaciones que pueden realizarse sobre un RDD son transformaciones y acciones.
  • Transformaciones: una operación de transformación sobre un RDD devuelve un nuevo RDD al que se le aplica cierta operación. Por ejemplo, aplicar una determinada función a los elementos del RDD (p. ej. multiplicar por 2 cada elemento), filtrar los elementos en base a una condición determinada, etc.
  • Acciones: las acciones devuelven un valor a la aplicación o Driver program. Por ejemplo, contar los elementos de un RDD, calcular el máximo, el mínimo o guardar el contenido de un RDD en un fichero son ejemplos de acciones sobre un RDD.

Ejemplo con Apache Spark

Para empezar a trabajar con Spark necesitaremos descargar y descomprimir el IDE Scala, que está basado en Eclipse. Para lanzar el entorno ejecutaremos el archivo eclipse.exe. Después, descargaremos el proyecto con el código fuente del repositorio GitHub en este enlace.

Ahora veremos un ejemplo muy sencillo en el que filtraremos una lista de números para obtener los que sean pares. En primer lugar crearemos un objeto llamado NumerosPares que extienda de App (línea 6) dentro del fichero de nombre NumerosPares.scala. Con esto, estamos definiendo una nueva aplicación de Scala, que ejecutará el código contenido en el propio objeto.

Configuraremos una conexión a un clúster Spark, en este caso local, mediante el objeto SparkConf, que después utilizaremos para instanciar un objeto de tipo SparkContext. Después crearemos un RDD partiendo de una lista de 10.000 números. Sobre este RDD de nombre listaNumerosRdd, realizaremos una transformación mediante un filtro. El filtro aplicará una función para obtener un nuevo RDD que solamente contenga los números pares. La notación utilizada en la función mediante la barra baja "_" es equivalente a escribir { x => x % 2 == 0} (línea 15).

Finalmente aplicaremos una acción sobre el RDD, para contar el número de elementos pares que después visualizaremos por consola.


Para ejecutar la aplicación, nos posicionaremos con el ratón en el fichero NumerosPares.scala, pulsando con el botón derecho del ratón y seleccionando Run As -> Scala Application.

Conclusiones

Spark es un entorno con múltiples funcionalidades, que simplifica enormemente la gestión de grandes cantidades de datos de forma transparente. La magia de Spark reside en que el programa escrito en el ejemplo, puede ejecutarse tanto en un clúster local, como en un clúster distribuido compuesto por multitud de nodos sin cambiar ni una sola línea de código.

En siguientes entradas veremos ejemplos más completos de la API Spark para la gestión de grandes cantidades de datos, así como otros módulos ofrecidos por el framework como los de consulta de datos o el de aplicación de algoritmos de machine learning.

Pulsa aquí para acceder al código de esta entrada en mi repositorio GitHub
Hoy hablaré de Elasticsearch, un indexador de documentos que se está haciendo muy popular gracias al stack ELK (Elasticsearch, Logstash y Kibana). Estos tres componentes son bastante comunes en arquitecturas Big Data. Logstash facilita la ingesta de datos provenientes de fuentes distribuidas, Elasticsearch indexa los datos y Kibana permite su visualización. En esta entrada mostraré las funcionalidades básicas de Elasticsearch como motor de indexación.



Las herramientas necesarias para seguir este tutorial son:

¿Qué es Elasticsearch?

Elasticsearch es un motor de búsqueda basado en Apache Lucene que permite indexar grandes cantidades de datos para su posterior consulta de forma eficiente. Los datos o documentos que se indexan no necesitan tener una estructura determinada, aunque para un mejor funcionamiento y explotación de los mismos es recomendable su definición.

Como todo sistema orientado a Big Data, es fácilmente escalable y se puede configurar de forma distribuida. Dispone de una API RESTful para interactuar con la plataforma y los datos, un lenguaje de consulta llamado Query DSL y multitud de librerías en los principales lenguajes de programación (Java, PHP, etc.).


Wikipedia, Soundcloud o GitHub utilizan Elasticsearch para proporcionar búsquedas de información en tiempo real.

Componentes principales de Elasticsearch

A continuación se describen los componentes principales de Elasticsearch.
  • Índice: es una colección de documentos que comparten características similares. Cada índice puede contener múltiples tipos de documentos y cada tipo puede contener múltiples documentos. Por ejemplo, si creamos un índice llamado "Mi blog", podemos indexar documentos de tipo "post" o de tipo "comentario".
  • Documento: un documento se representa en formato JSON y se almacena en un índice. Cada documento tiene un tipo y un identificador único.
  • Mapeo: el mapeo se utiliza para realizar la correspondencia entre un campo de un documento JSON y su tipo de dato. Elasticsearch crea estos mapeos de forma automática al indexar un nuevo documento, aunque pueden ser asignados de forma manual también.
  • Shard: un shard es una entidad física donde se almacenan los datos para los distintos índices creados. Cada índice puede tener diversos shard, tanto primarios como secundarios (para respaldos, balanceo de carga, etc.). Los shard pueden estar distribuidos en los diferentes nodos que pueden componer un cluster Elasticsearch. Al iniciar una instancia de Elasticsearch (un nodo), se crea un cluster de nombre "elasticsearch" y se crean 5 shard primarios por cada índice.

Puesta en marcha de Elasticsearch

Después de instalar y configurar Java, deberemos descargar y descomprimir la distribución de Elasticsearch. Accederemos a la carpeta bin dentro del directorio donde hayamos descomprimido Elasticsearch y ejecutaremos el fichero elasticsearch.bat (en Windows).

Una vez iniciado, accederemos a la URL http://localhost:9200/?pretty con nuestro navegador para comprobar que funciona correctamente. Se mostrarán algunos datos básicos como el nombre del clúster y del nodo (como curiosidad, comentar que se asigna un nombre de nodo correspondiente a un personaje de Marvel...).



Ahora instalaremos el plugin kopf, para lo que abriremos una consola de comandos en el directorio bin de Elasticsearch y ejecutaremos el siguiente comando:


Una vez instalado, podremos acceder a la URL http://localhost:9200/_plugin/kopf donde veremos el cluster, los nodos, los shards y su estado.




Trabajando con documentos

Vamos ahora a indexar algunos documentos. A la hora de insertar un nuevo documento mediante la API REST, se sigue el patrón http://localhost:9200/<indice>/<tipo>/[<id>]. El índice se crea si no existe, así como el tipo. El identificador es opcional, y si no se proporciona Elasticsearch asignará uno de forma automática.

Por ejemplo, vamos a utilizar Elasticsearch para indexar nuestra biblioteca multimedia. Empezaremos por los libros, añadiendo un nuevo al índice llamado "multimedia". Realizaremos llamadas a la API mediante la extensión de Chrome Advanced Rest Client.

Indexaremos un nuevo libro realizando una petición PUT a la URL http://localhost:9200/multimedia/libro/1 enviando en el cuerpo del mensaje el siguiente documento JSON.


Podemos visualizar los índices contenidos en Elasticsearch para ver que se ha creado el nuevo índice correctamente, mediante la consulta GET http://localhost:9200/_cat/indices. Esta consulta devuelve un listado de índices en formato texto, donde veremos nuestro nuevo índice "multimedia".

Ahora vamos a realizar algunas búsquedas. Mediante la consulta GET http://localhost:9200/_search podemos visualizar todos los documentos indexados, para comprobar que nuestro libro se ha insertado correctamente.

Esta consulta de documentos, puede realizarse también en relación al índice o al tipo de documento. Por ejemplo, para obtener todos los libros se utilizaría la consulta siguiente http://localhost:9200/multimedia/libro/_search

Para realizar una búsqueda de un libro que contenga en cualquiera de sus campos un texto determinado, se puede utilizar la siguiente consulta de tipo POST http://localhost:9200/multimedia/_search En el cuerpo de la petición, enviaremos el término por el que queremos realizar la búsqueda. En este caso, vamos a buscar por la palabra "pampinoplas" enviando el siguiente JSON codificado mediante Query DSL en el cuerpo de la petición.



Si lo que queremos es buscar un libro por un campo determinado, se utilizaría la consulta POST http://localhost:9200/multimedia/_search enviando el siguiente JSON, donde se especifican los campos en los que realizar la búsqueda del término proporcionado.



Conclusiones

Elasticsearch es una excelente solución para la indexación de documentos, pensada para realizar búsquedas eficientes en grandes cantidades de datos. Es importante mencionar que Elasticsearch está concebida para realizar búsquedas rápidas, por lo que no soporta transacciones ni sus principios ACID.

El ciclo de vida de los datos suele ser de una única escritura en Elasticsearch y múltiples lecturas. Por ejemplo, puede utilizarse como soporte a bases de datos relacionales que sí soportan los principios ACID, pero que no disponen de sistemas de indexación de texto eficientes. De esta manera, se pueden replicar datos de forma asíncrona a una instancia de Elasticsearch para delegar en esta plataforma las búsquedas de texto.

Como ya he comentado, el poder de Elasticsearch aumenta en combinación con Logstash y Kibana, conformando el stack ELK. En siguientes entradas hablaré del resto de componentes de este stack y pondré algún ejemplo de uso.

Si necesitas más información sobre Elasticsearch, te recomiendo que te compres un par de libros muy completos, "Elasticsearch In Action" y "Elasticsearch: The Definitive Guide".

En esta entrada veremos la implementación del controlador y la fachada. El controlador gestionará las peticiones HTTP y hará uso de la clase fachada, que albergará la lógica de negocio de nuestro servicio. Esta última utilizará el repositorio de datos para realizar operaciones contra la base de datos.


Finalizaré la entrada con algunos ejemplos de consumo de la API implementada con un cliente REST y unas conclusiones sobre este primer tutorial básico de implementación de una API RESTful con Spring Framework.

Fachada

La clase fachada implementada es LibroFacade, y puesto que de momento nuestra lógica de negocio es muy sencilla, simplemente realiza llamadas al repositorio de datos. Hemos utilizado las siguientes anotaciones en la clase.
  • @Service (línea 12): utilizada con el fin de que Spring Framework sepa que es un componente y pueda gestionarlo como tal. 
  • @Autowired (línea 15): puesto que la clase hace uso del repositorio de datos (LibroJpaRepository), se ha realizado una inyección de dependencia mediante esta anotación. 
La clase dispone de métodos para:
  • Obtener todos los libros (línea 18). 
  • Obtener un libro por su identificador (línea 22).
  • Actualizar un libro (línea 26). 
  • Insertar un libro (línea 30).
  • Borrar un libro (línea 35). 

Controlador

La clase LibroController será el controlador que gestione las peticiones HTTP. Esta clase utiliza las siguientes anotaciones.
  • @RestController (línea 20): para informar a Spring de que la clase es un controlador. Quitar el nombre del controlador por ahora
  • @Autowired (línea 24): para la inyección de dependencia de la clase fachada.
  • @RequestMapping (p. ej. línea 37): indica la ruta que se deberá utilizar para llamar al servicio. Esta anotación se encuentra en la propia clase (ruta raíz del servicio /libros) y en los métodos para indicar las rutas concretas para esas peticiones respecto a la ruta raíz. Si la ruta se especifica mediante llaves, se indica que el valor será variable (por ejemplo, un GET a libros/1 o libros/2). Además se han utilizado las propiedades method y consumes para especificar el tipo de llamada HTTP que soportan y el tipo de dato que esperan consumir.
  • @RequestBody (p. ej. línea 38): indica la entidad a la que se mapearán los datos que lleguen en el cuerpo de la petición HTTP. En este caso se enviará un JSON representando al libro en cuestión. Para realizar el mapeo correctamente, el documento JSON deberá tener las mismas propiedades que la clase Libro.
  • @PathVariable (p. ej. línea 33): se utiliza para asignar a una variable el valor de la ruta especificado entre llaves. 
Las llamadas soportadas por el controlador son las siguientes:
  • GET /libros: obtiene todos los libros.
  • GET /libros/1: obtiene un libro por su identificador.
  • PUT /libros/1: actualiza un libro determinado.
  • POST /libros: crea un nuevo libro.
  • DELETE /libros/1: borra un libro por su identificador.
En el método de inserción de un libro (línea 44), se ha implementado un fragmento de código que incorpora en la cabecera de la respuesta el link al nuevo recurso creado (líneas 48 a 51). 


Consumo de la API

Para poder consumir nuestro servicio, podemos utilizar cualquier cliente REST. En este caso, vamos a utilizar Advanced Rest Client, que es una extensión de Chrome. En la siguiente animación, se muestra cómo se obtiene en primera instancia el listado de libros en formato JSON. Después se obtiene el libro número 1.

Posteriormente, se inserta un nuevo libro, proporcionando el JSON que representa el libro y especificando el tipo de dato que se envía en el cuerpo de la petición (Content-Type: application/json). Finalmente se elimina el libro insertado.



Los códigos de respuesta obtenidos son proporcionados de forma automática por el controlador de Spring Framework en base al tipo de operación que se realiza.

  • 200 - OK, es la respuesta estándar para las peticiones que se han servido correctamente.
  • 201 - Creado, se utiliza para informar de que un recurso se ha creado correctamente. 

Conclusiones

Y hasta aquí el ejemplo básico de implementación de una API RESTful con Spring Framework y JPA. Como se puede observar, Spring Framework simplifica mucho desarrollo de una API RESTful, evitando al programador la escritura de líneas redundantes que tienden a aumentar el número de errores. 

Además, los métodos básicos de gestión de recursos vienen ya proporcionados por Spring Framework mediante sus repositorios de datos. Estas características hacen de esta aproximación una buena elección de cara a implementar una API RESTful de forma sencilla, centrando las labores de programación, sobre todo, en la lógica de negocio del propio servicio.

En próximas entradas actualizaré el proyecto para incorporar nuevas funcionalidades con el fin de poder ver más características de Spring Framework y JPA, como por ejemplo, el control de excepciones, nuevas anotaciones, mapeos de relaciones entre entidades, etc.

Pulsa aquí para acceder al código de esta entrada en mi repositorio Github