Twitter Storm

Storm является распределенной системой для выполнения вычислений в реальном времени. Она родилась в рамках проекта Backtype, который специализировался на аналитике твитов и который в июле 2011 был приобретен Twitter. Так же как ApacheHadoop предоставляет набор базовых абстракций, инструментов и механизмов для пакетной обработки данных, Twitter Storm делает это для задачи обработки данных в режиме реального времени. Хотите узнать в чем их отличие?

Отличие

Не смотря на то, что Storm изначально появился на свет в процессе неудачных попыток приспособить Hadoop к задаче обработки данных в реальном времени, сравнивать их некорректно. Никакой хак или патч не сможет заставить Hadoop работать по-настоящему в режиме реального времени, так как в его основе лежит фундаментально другая концепция и набор принципов, которые актуальны лишь в контексте задачи пакетной обработки данных. Storm можно представить как "Hadoop для вычислений в реальном времени", но по факту между ними нет практически ничего общего, кроме изначально-распределенной природы, слегка похожей архитектуры, работы внутри JVM и публичной доступности. Для понимания задачи, которая стоит перед Storm, лучше взглянуть на то, как она обычно решается.

Традиционно, если перед проектом или бизнесом вставала задача обработки какой-то информации в реальном времени, то она в итоге сводилась к цепочке преобразований данных и распределялась по серверам, которые их выполняют и передают результаты друг другу посредством сообщений и очередей-посредников. При таком подходе существенная часть времени уходила на маршрутизацию сообщений, настройку и развертывание новых промежуточных очередей и обработчиков, обеспечение отказоустойчивости и надежности. По сути Storm берет все вышеперечисленное на себя, позволяя разработчикам сосредоточиться на реализации логики обработки сообщений.

Особенности

Итак, основные особенности Storm, вытекающие из требований к подобным системам:

  • Три основных варианта использования, но ими он не ограничивается:

    • Обработка потоков сообщений(stream processing) в реальном времени, с возможностью внесения изменений во внешние базы данных;
    • Постоянные вычисления (continuous computation) на основе источников данных с публикацией результатов произвольным клиентам в реальном времени;
    • Распределенные удаленные вызовы(distributed RPC) с выполнением комплексных вычислений параллельно во время запроса.
  • Масштабируемость: Storm может обрабатывать огромное количество сообщений в секунду. Для масштабирование необходимо лишь добавить сервера в кластер и увеличить параллельность в настройках топологии. В одном из первых приложений для Storm обрабатывался 1 миллион сообщений в секунду на кластере из 10 серверов, при этом выполнялось несколько сотен запросов в секунду к внешней базе данных.

  • Гарантия отсутствия потерь данных: в отличии от других систем обработки сообщений в реальном времени (например S4 от Yahoo!) это свойство изначально является частью архитектуры Storm. Для этого используется механизм подтверждения (acknowledgement) успешной обработки каждого конкретного сообщения.
  • Стабильность: в то время как Hadoop позволительны простои по несколько часов, так как он априори не является системой реального времени, одной из основных целей Storm является стабильная бесперебойная работа кластера, с максимально безболезненным его управлением.
  • Защита от сбоев: если что-то пошло не так во время выполнения вычисления, Storm переназначит задачи и попробует снова. В его задачи входит обеспечение бесконечной работы вычислений (или до момента запланированной или ручной остановки).
  • Независимость от языка программирования: в то время как большая часть системы написана на Clojure и работает в JVM, сами компоненты системы могут быть реализованы на любом языке, что удобно для проектов, использующих в основном другие технологии.

У Вас уже могло сложиться общее представление, о том что собой представляет Twitter Storm и насколько он актуален лично для Вас или Вашего проекта. Если интерес все еще не погас, предлагаю перейти к концепции, предлагаемой Storm для разработки приложений под эту платформу.

Концепция

Для начала пройдемся по основным абстракциям, которые используются в Storm:

  • Поток(Stream): неограниченный поток сообщений, представленных в виде кортежей (произвольных именованный список значений). При этом все кортежи в одном потоке должны иметь одинаковую схему: элемент на каждой позиции должен иметь один и тот же тип данных и значение.
  • Струя воды из крана (Spout): источник потоков, который берет их из какой-то внешней системы.
  • Cтруя состояния (state spout): предоставляет распределенный доступ к некому общему состоянию, которое кэшируется в памяти на исполнителях и синхронно обновляется при внешних изменениях. Таким образом возможно избежать обращений к внешней базе данных при обработке каждого сообщения. В случае с Twitter этим общим состоянием является сам социальный граф.
  • Молния(Bolt): обрабатывает входящие потоки и создает исходящие потоки, производя какую-либо обработку данных (по сути здесь реализуется основная бизнес-логика). Помимо этого никто не запрещает использовать при обработке какие угодно внешние сервисы вроде СУБД.
  • Топология(Topology): произвольная связанная сеть из "молний" и "струй". При создании топологии можно указать:
    • уровень параллелизма для каждого компонента, что создаст необходимое количество его потоков исполнения в кластере.
    • группировку потоков, то есть как именно сообщения будут распределяться между созданными потоками исполнения каждого компонента, есть четыре основных варианта - случайно (shuffle), каждый получит по копии (all), хэш по определенным полям сообщения (fields), один поток получает все сообщения  (global).

Таким образом, для создания приложения для обработки данных в реальном времени с использованием Storm, необходимо:

  1. Определить схему(ы) потока(ов) сообщений.
  2. Реализовать источник(и) сообщений, основанные на парсинге каких-то внешних данных (для Backtype это был Twitter firehose, поток всех твитов) или реакции на события (допустим действия пользователей в виде HTTP-запросов).
  3. Реализовать обработчик(и) сообщений, которые преобразуют входящие сообщения и либо создают новые потоки сообщений, либо как-то влияют на внешний мир, например изменяя что-то в базе данных (они используют Cassandra для этого).
  4. Объединить реализованные компоненты в топологию и запустить её на кластере.
  5. При необходимости оптимизировать систему, включив общее состояние в топологию.

С точки зрения разработчика приложения большего знать и не нужно, но самое интересное происходит как раз дальше. Что собой представляет Storm-кластер и как с его помощью исполняется реализованное описанным выше способом приложение?

Архитектура

Проект очень сильно завязан на Zookeeper для координации работы кластера, с чем он очень неплохо справляется. Все остальные компоненты системы системы не содержат в себе состояния, что обеспечивает их быстрый запуск, даже после kill -9.

Storm Cluster

В остальном все достаточно просто:

  • Мастер-сервер (Nimbus) отвечает за распространение кода, распределение задач и мониторинг сбоев.
  • На каждом сервере в кластере запускается процесс-надсмотрщик (Supervisor), который запускает локально потоки исполнения, отвечающие за выполнение назначенных ему компонентов топологий.
  • Передача сообщений между компонентами топологий осуществляется напрямую, посредством ZeroMQ.
  • Топологии являются Thrift-структурами, а мастер-сервер - Thrift-сервисом, что позволяет осуществлять регистрацию топологий и другие операции программно из любого языка программирования.

Присутствующий в единственном экземпляре мастер-сервер является единственной точкой отказа лишь на первый взгляд. По факту он используется лишь для внесение изменений в кластер и топологии, так что его непродолжительное отсутствие не повлияет на функционирование запущенных вычислений. А так как состояние кластера хранится в Zookeeper, то запуск мастера на другой машине в случае аппаратного сбоя - вопрос лишь грамотно настроенного мониторинга и максимум одной минуты.

Используемый механизм подтверждений успешной обработки сообщения (acknowledgement) гарантирует, что все сообщения, попавшие в систему, рано или поздно будут обработаны, даже при локальных сбоях оборудования. Хотя более глобальные катаклизмы вроде "потери" стойки все же могут нарушить функционирование системы, про работу в нескольких датацентрах речь также не идет.

Планы на будущее

  • Использование Mesos для распределения и изоляции вычислительных ресурсов.
  • Изменение кода "на лету", сейчас для этого нужно остановить старую топологию и запустить новую, что может означать простой в пару минут.
  • Автоматическое определения необходимого уровня параллельности и адаптация под изменения в интенсивности входящего потока сообщений.
  • Еще более высокоуровневые абстракции.

Подводим итоги

  • На самом деле подход, лежащий в основе Storm, не является чем-то кардинально-новым. Помимо упоминавшегося выше S4 можно найти еще несколько альтернатив, пускай и менее близких по идеологии. Подробнее про эту тему можно узнать погуглив complex event processing или real-time stream processing.
  • Storm выделяет из их числа простота, гибкость, масштабируемость и отказоустойчивость в одном флаконе. Обеспечивает это в первую очередь простая и понятная архитектура, основанная на (уже) проверенном временем и многими проектами распределенном координаторе в виде Zookeeper.
  • Хоть за проектом и стоит крупный интернет-проект в лице Twitter, он достаточно молод и нужно быть морально готовым к возможным сбоям и неудачным моментам. Плюс не забывайте, что существенная часть написана на Clojure - для, пожалуй, большинства разработчиков изучение исходников проекта будет капитальным "выносом мозга". Мое первое знакомство с Lisp(Clojure - его диалект, работающий в JVM) надолго засело в памяти из-за обилия скобочек за каждым углом :)
  • В любом случае из доступных opensource реализаций систем для распределенных вычислений в реальном времени Storm на мой взгляд является наиболее перспективным для применения в интернет-проектах.
  • Если Вашему проекту нужна лишь одна-две топологии и особо большого кластера не планируется, то подобную схему достаточно не сложно реализовать и просто посредством Zookeeper + ZeroMQ или альтернативных технологий. Это избавит проект от возможных заморочек с Clojure и другими "особенностями" Storm, ценой вероятно существенно большей собственной кодовой базы, которую придется самостоятельно тестировать и поддерживать. Какой путь ближе - команда каждого проекта решает для себя сама.
  • Помимо различных вариаций веб-аналитики заманчивыми применениями подобной системы в Интернете может стать:
    • построение индекса для поисковых систем, на сколько я знаю от MapReduce здесь отказался только Google;
    • поведенческий таргетинг для рекламы - собираем действия пользователей и делаем на их основе выводы в реальном времени;
    • ведение рейтингов чего-либо в реальном времени - в зависимости от специфики проекта можно определять и показывать лучшие, самые просматриваемые или самые комментируемые статьи/фото/видео/музыку/товары/комментарии/что-нибудь-еще;
    • предлагаем свои варианты в комментариях.

Удачи в построении приложений для вычислений в реальном времени и до встречи на страницах Insight IT!

Источники информации