Wed. Sep 28th, 2022

Modelado escalable de series temporales con proyectos de código abierto StatsForecast, Fugue y Spark

Por Kevin Kho, Han Wang, Max Mergenthaler y Federico Garza Ramírez.TL: DR Le mostraremos cómo puede aprovechar el poder distribuido de Spark y el código altamente eficiente de StatsForecast para adaptarse a millones de modelos en un par de minutos.El modelado, el análisis y la predicción de series temporales de tendencias y estacionalidades para los datos recopilados a lo largo del tiempo es una categoría de aplicaciones de software en rápido crecimiento. Las empresas, desde electricidad y economía hasta análisis de atención médica, recopilan datos de series temporales diariamente para predecir patrones y crear mejores datos -experiencias de producto impulsadas. Por ejemplo, la predicción de la temperatura y la humedad se utiliza en la fabricación para evitar defectos, las predicciones de métricas de transmisión ayudan a identificar a los artistas musicales más populares y la previsión de ventas de miles de SKU en diferentes ubicaciones de la cadena de suministro se utiliza para optimizar los costos de inventario. A medida que aumenta la generación de datos, las necesidades de pronóstico han evolucionado desde el modelado de unas pocas series de tiempo hasta la predicción de millones. Nixtla es un proyecto de código abierto centrado en el pronóstico de series de tiempo de última generación. Tienen un par de bibliotecas como StatsForecast para modelos estadísticos, NeuralForecast para aprendizaje profundo y HierarchicalForecast para agregaciones de pronósticos en diferentes niveles de jerarquías. Estas son bibliotecas de series de tiempo listas para producción enfocadas en diferentes técnicas de modelado. Este artículo analiza StatsForecast, una biblioteca de pronóstico ultrarrápida con modelos estadísticos y econométricos. El modelo AutoARIMA de Nixtla es 20 veces más rápido que pmdarima, y ​​los modelos ETS (error, tendencia, estacional) se desempeñaron 4 veces más rápido que los modelos estadísticos y son más robustos. Los puntos de referencia y el código para reproducir se pueden encontrar aquí. Una gran parte del aumento del rendimiento se debe al uso de un compilador JIT llamado numba para lograr altas velocidades. El tiempo de iteración más rápido significa que los científicos de datos pueden ejecutar más experimentos y converger en modelos más precisos más rápido. También significa que la ejecución de puntos de referencia a escala se vuelve más fácil. En este artículo, estamos interesados ​​en la escalabilidad de la biblioteca StatsForecast para ajustar modelos sobre Spark o Dask usando la biblioteca Fugue. Esta combinación nos permitirá entrenar rápidamente una gran cantidad de modelos distribuidos en un clúster temporal. Cuando se trata de grandes datos de series de tiempo, los usuarios normalmente tienen que lidiar con miles de series de tiempo lógicamente independientes (piense en la telemetría de diferentes usuarios o diferentes ventas de productos). ). En este caso, podemos entrenar un modelo grande sobre todas las series, o podemos crear un modelo para cada serie. Ambos son enfoques válidos ya que el modelo más grande captará tendencias en toda la población, mientras que entrenar miles de modelos puede ajustarse mejor a los datos de series individuales. biblioteca Nixtla HierarchicalForecast, pero esto también es más costoso desde el punto de vista computacional y más complicado de escalar. Este artículo tratará el escenario en el que entrenamos un par de modelos (AutoARIMA o ETS) por serie temporal univariante. Para esta configuración, agrupamos los datos completos por serie temporal y luego entrenamos cada modelo para cada grupo. La siguiente imagen ilustra esto. El DataFrame distribuido puede ser un Spark o un Dask DataFrame.AutoARIMA por partición: imagen de AuthorNixtla anteriormente lanzó puntos de referencia con Anyscale sobre la distribución de este modelo de capacitación en Ray. La configuración y los resultados se pueden encontrar en este blog. Los resultados también se muestran a continuación. Se necesitaron 2000 CPU para ejecutar un millón de modelos AutoARIMA en 35 minutos. Compararemos esto con la ejecución en Spark.Resultados de StatsForecast en Ray: imagen del autor En primer lugar, veremos el código de StatsForecast utilizado para ejecutar AutoARIMA de forma distribuida en Ray. Esta es una versión simplificada para ejecutar el escenario con una serie de tiempo de un millón. También se actualizó para la versión reciente de StatsForecast v1.0.0, por lo que puede verse un poco diferente del código en los puntos de referencia anteriores. Ejecutar StatsForecast de forma distribuida en Ray La interfaz de StatsForecast es mínima. Ya está diseñado para realizar el AutoARIMA sobre cada grupo de datos. Solo proporcionando la ray_address hará que este fragmento de código se ejecute de forma distribuida. Sin él, n_jobs indicará el número de procesos paralelos para la previsión. model.forecast() hará el ajuste y la predicción en un solo paso, y la entrada a este método en el horizonte de tiempo para pronosticar. Fugue es una capa de abstracción que transfiere código de Python, Pandas y SQL a Spark y Dask. La interfaz más mínima es la función transform(). Esta función toma una función y DataFrame, y la lleva a Spark o Dask. Podemos usar la función transform() para llevar la ejecución de StatsForecast a Spark. Hay dos partes en el código a continuación. Primero, tenemos la lógica de pronóstico definida en la función Forecast_series. Algunos parámetros están codificados para simplificar. El más importante es que n_jobs=1 . Esto se debe a que Spark o Dask ya funcionarán como la capa de paralelización, y tener dos etapas de paralelismo puede causar interbloqueos de recursos. Al ejecutar Statsforecast en Spark con FugueSecond, la función transform() se usa para aplicar la función forecast_series() en Spark. Los dos primeros argumentos son el marco de datos y la función que se aplicará. El esquema de salida es un requisito para Spark, por lo que debemos pasarlo, y el argumento de partición se encargará de dividir el modelado de la serie temporal por ID único. Este código ya funciona y devuelve una salida de Spark DataFrame. La transformación () anterior es una una mirada general a lo que Fugue puede hacer. En la práctica, los equipos de Fugue y Nixtla colaboraron para agregar un FugueBackend más nativo a la biblioteca de StatsForecast. Junto con él, hay una función de pronóstico de utilidad () para simplificar la interfaz de pronóstico. A continuación se muestra un ejemplo de extremo a extremo de la ejecución de StatsForecast en un millón de series temporales. Solo necesitamos crear el FugueBackend, que toma una SparkSession y la pasa a forecast() . Esta función puede tomar un DataFrame o una ruta de archivo a los datos. Si se proporciona una ruta de archivo, se cargará con el backend paralelo. En este ejemplo anterior, reemplazamos el archivo cada vez que ejecutamos el experimento para generar puntos de referencia. También es importante tener en cuenta que podemos probar localmente antes de ejecutar el pronóstico () con datos completos. Todo lo que tenemos que hacer es no proporcionar nada para el argumento paralelo; todo se ejecutará en Pandas secuencialmente. Los resultados de referencia se pueden ver a continuación. Al momento de escribir este artículo, Dask y Ray realizaron lanzamientos recientes, por lo que solo las métricas de Spark están actualizadas. Haremos un artículo de seguimiento después de ejecutar estos experimentos con las actualizaciones.Pruebas comparativas de Spark y Dask para StatsForecast a escala Nota: El intento fue utilizar 2000 cpus, pero estábamos limitados por las instancias informáticas disponibles en AWS. La parte importante aquí es que AutoARIMA entrenó un millón de modelos de series temporales en menos de 15 minutos. La configuración del clúster se adjunta en el apéndice. Con muy pocas líneas de código, pudimos orquestar el entrenamiento de estos modelos de series temporales de forma distribuida. Entrenar miles de modelos de series temporales de forma distribuida normalmente requiere mucha codificación con Spark y Dask, pero pudimos ejecutar estos experimentos con muy pocos líneas de código. StatsForecast de Nixtla ofrece la capacidad de utilizar rápidamente todos los recursos informáticos disponibles para encontrar el mejor modelo para cada serie temporal. Todo lo que los usuarios deben hacer es proporcionar un backend paralelo relevante (Ray o Fugue) para ejecutar en un clúster. En la escala de un millón de series temporales, nuestro tiempo total de capacitación tomó 12 minutos para AutoARIMA. Esto es el equivalente a cerca de 400 horas de CPU que ejecutamos de inmediato, lo que permite a los científicos de datos iterar rápidamente a escala sin tener que escribir el código explícito para la paralelización. Debido a que usamos un clúster efímero, el costo es efectivamente el mismo que ejecutar esto secuencialmente en una instancia EC2 (en paralelo en todos los núcleos). Nixtla StatsForecast repoStatsForecast docsFugue repoFugue tutorialsPara chatear con nosotros:Fugue SlackNixtla SlackPara cualquiera. interesado en la configuración del clúster, se puede ver a continuación. Esto activará un clúster de Databricks. Lo importante es el node_type_id que tiene las máquinas utilizadas. {
“num_trabajadores”: 20,
“cluster_name”: “fuga-nixtla-2”,
“chispa_versión”: “10.4.x-scala2.12”,
“chispa_conf”: {
“chispa.especulación”: “verdadero”,
“chispa.sql.shuffle.particiones”: “8000”,
“spark.sql.adaptive.enabled”: “falso”,
“chispa.tarea.cpus”: “1”
},
“atributos_aws”: {
“first_on_demand”: 1,
“disponibilidad”: “SPOT_WITH_FALLBACK”,
“zone_id”: “us-west-2c”,
“spot_bid_price_percent”: 100,
“ebs_volume_type”: “GENERAL_PURPOSE_SSD”,
“ebs_volume_count”: 1,
“ebs_volume_size”: 32
},
“node_type_id”: “m5.24xgrande”,
“driver_node_type_id”: “m5.2xgrande”,
“ssh_public_keys”: [],
“etiquetas_personalizadas”: {},
“chispa_env_vars”: {
“MKL_NUM_THREADS”: “1”,
“OPENBLAS_NUM_THREADS”: “1”,
“VECLIB_MAXIMUM_THREADS”: “1”,
“OMP_NUM_THREADS”: “1”,
“NUMEXPR_NUM_THREADS”: “1”
},
“autoterminación_minutos”: 20,
“habilitar_disco_elástico”: falso,
“cluster_source”: “IU”,
“init_scripts”: [],
“runtime_engine”: “ESTÁNDAR”,
“cluster_id”: “0728-004950-oefym0ss”
}