ELK Stack (II): Procesamiento de datos con Logstash

, , No Comments
Después de presentar ElasticSearch, es el turno de Logstash, un recolector de datos distribuidos. Logstash fue diseñado para recopilar datos provenientes de ficheros de logs (p. ej. logs de servidores Apache, Tomcat, etc.) con el fin de pre-procesarlos y enviarlos a un sistema de almacenamiento, como es el caso de ElasticSearch. Sin embargo, la herramienta ha evolucionado mucho y ya existen multitud de plugins que nos permiten obtener datos desde fuentes de datos de lo más variado (p. ej. Twitter, ficheros, bases de datos, servicios,...), pre-procesar dichos datos aplicando filtros y redirigir los datos (p. ej. hacia repositorios de datos, sistemas de visualización, sistemas de colas, etc.)


En esta entrada veremos un ejemplo de ingesta de datos desde un fichero CSV hacia Elsticsearch mediante Logstash. Para ello, utilizaremos un fichero con valores de cotización en bolsa de las compañías Yahoo y Google durante el año 2015 que puedes descargar aquí. Si te apetece obtener datos de alguna otra compañía, puedes obtener un fichero CSV similar en este enlace.

Para poder seguir esta entrada, necesitarás las siguientes herramientas:

Flujo de trabajo con Logstash

El flujo de trabajo con Logstash se basa en la definición de entradas, filtros y salidas de datos. En las entradas (input), se define la fuente de datos desde donde obtener los datos. En los filtros (filter) se definen operaciones y transformaciones sobre los datos, y en las salidas (output) se define el destino de dichos datos. Este flujo de datos se especifica en un fichero de configuración que tiene el siguiente aspecto.


Ejemplo con Logstash

Para realizar el ejemplo, tendremos que habilitar la plataforma Elasticsearch. Después, tendremos que descargar y descomprimir Logstash. A continuación, copiaremos en la carpeta bin del directorio de Logstash el fichero CSV que utilizaremos como fuente de datos de entrada (input), que puedes descargar aquí.

Ahora tendremos que crear el fichero de configuración que utilizaremos para leer los valores de las acciones, pre-procesarlos y enviarlos a Elasticsearch para su indexación.

En el bloque correspondiente al input hemos especificado las siguientes propiedades:
  • file: para configurar una entrada de datos proveniente de un fichero.
    • path: ruta donde se encuentra el fichero.
    • start_position: posición del fichero desde la que Logstash empezará a parsear datos.
    • type: tipo de documento que se creará en Elasticsearch cuando se indexen los datos.
    • sincedb_path: utilizado para que Logstash no tenga en cuenta dónde se quedó el cursor al parsear el fichero. Esta configuración viene bien para realizar pruebas, ya que de esta forma siempre podremos volver a indexar los datos del fichero. 
    • mutate: utilizado para mapear tipos de datos.
En el bloque correspondiente a los filtros hemos configurado lo siguiente:
  • csv: para mapear las columnas del fichero CSV a propiedades del documento JSON con unos nombres determinados.
  • date: para especificar la fecha proveniente de los datos de cotización de las acciones. Para ello, especificamos el patrón que siguen las fechas en el fichero CSV mediante la propiedad match, especificamos el locale y finalmente especificamos con target la propiedad del documento JSON de salida a la que se mapeará la fecha. Si no se configura un filtro de tipo date, Elasticsearch indexará cada documento con el timestamp en el que se recibió dicho documento.
Finalmente especificamos el output o salida de los datos mediante las siguientes configuraciones:
  • elasticsearch: donde especificamos el host donde está Elasticsearch y el índice que creará cuando se indexen los datos. Este índice se ha creado utilizando un patrón, de manera que se crea un índice por cada año perteneciente a los datos de cotización en bolsa. De esta forma se podrán borrar los documentos asociados a cada año borrando para ello su índice correspondiente.
  • stdout: mostrará en consola de comandos los datos parseados y enviados a Elasticsearch con el fin de comprobar que el proceso ha ido bien.


Una vez creado el fichero, deberemos situarlo en la carpeta bin de nuestro directorio de Logstash. Para empezar el proceso, tendremos que ejecutar Logstash, abriendo una consola de comandos en el directorio bin y escribiendo el siguiente comando, donde especificamos qué fichero de configuración queremos que Logstash utilice.

Cuando termine el proceso, podremos acceder al manager de Elasticsearch (http://localhost:9200/_plugin/kopf) para comprobar que los índices se han creado y que existen datos indexados. Concretamente, veremos que se han indexado 511 documentos nuevos en el índice logstash-bolsa-2015.


Conclusiones

Logstash es muy sencillo de configurar y dispone de multitud de plugins para realizar procesos de gestión de datos, lo que le convierte en una herramienta muy flexible. En la siguiente entrada utilizaremos Kibana para crear visualizaciones sobre los datos que hemos indexado con Elasticsearch.

0 comentarios:

Publicar un comentario