Многопоточная выгрузка одного сообщения обмена

Публикация описывает, как можно распараллелить выгрузку одного сообщения обмена.

В своей статье о планах обмена я поделился результатами исследования этого объекта конфигурации.

В этой публикации я хочу озвучить идею как можно распараллелить выгрузку сообщения обмена.

Заранее прошу прощения если это «баян». В таком случае просьба дать ссылки где почитать источники.

Код состоит из двух частей:

1. Управление формированием фоновых заданий и объединение результатов их работы.

2. Управление выполнением списка фоновых заданий.

Полный исходный код реализации идеи находится во вложенном к публикации файле.

Идея заключается в следующем:

1. Мы как обычно начинаем выгрузку.

2. В момент, когда начинаем выбирать изменения, мы распараллеливаем запись файла сообщения обмена по объектам конфигурации, которые входят в состав плана обмена. Для каждого такого объекта мы создаём отдельное фоновое задание и пишем все его изменения в отдельный файл — часть сообщения обмена. Подробнее можно посмотреть в моей статье в разделе «Запись изменений в сообщение обмена».

3. Ожидаем завершения выполнения всех сформированных заранее фоновых заданий.

4. Объединяем результаты фоновых заданий в один главный файл сообщения обмена.

5. Завершаем выгрузку как обычно.

Код главной управляющей процедуры выглядит следующим образом:

Процедура СформироватьСообщениеОбмена()

УзелОбмена = ПланыОбмена.Тестовый.НайтиПоНаименованию("Узел получатель", Истина);

// Получаем список объектов конфигурации, который входят в состав плана обмена
СоставПланаОбмена = Метаданные.ПланыОбмена.Тестовый.Состав;

// Открываем главный файл сообщения обмена
ЗаписьXML = Новый ЗаписьXML();
ПолноеИмяФайла = КаталогСообщений + "" + Строка(УзелОбмена.УникальныйИдентификатор()) + ".xml";
ЗаписьXML.ОткрытьФайл(ПолноеИмяФайла);

// Создаём объект "ЗаписьСообщенияОбмена" и блокируем узел (объектная блокировка)
ЗаписьСообщения = ПланыОбмена.СоздатьЗаписьСообщения();
ЗаписьСообщения.НачатьЗапись(ЗаписьXML, УзелОбмена);
НомерСообщения = ЗаписьСообщения.НомерСообщения;

// Массив фоновых заданий - по одному на каждый объект конфигурации, который входит в состав плана обмена
СписокФоновыхЗаданий = Новый Массив();

// Количество постоянно активных фоновых заданий.
// Управление их выполнением осуществляется специальными процедурами (см. ниже).
// Обычно их количество должно равняться количеству ядер сервера.
// На практике нужно подбирать их количество в зависимости от загрузки сервера.
// Слишком большое количество фоновых заданий может "положить" сервер.
КоличествоАктивныхЗаданий = 4;

// Цикл формирования списка фоновых задний и их параметров
Для Каждого Элемент Из СоставПланаОбмена Цикл

ИмяЗадания = "Экспорт сообщения № " + Формат(НомерСообщения, "ЧГ=0") + ": " + Элемент.Метаданные.ПолноеИмя();

Параметры = Новый Массив();
Параметры.Добавить(УзелОбмена);
Параметры.Добавить(НомерСообщения);
Параметры.Добавить(Элемент.Метаданные);

ПараметрыФоновогоЗадания = Новый Структура();
ПараметрыФоновогоЗадания.Вставить("ИмяЗадания", ИмяЗадания);
ПараметрыФоновогоЗадания.Вставить("ИмяПроцедуры", "ОбщийМодульОбменаДанными.ВыполнитьВыгрузкуИзмененийОбъектаКонфигурации");
ПараметрыФоновогоЗадания.Вставить("ПараметрыПроцедуры", Параметры);

СписокФоновыхЗаданий.Добавить(ПараметрыФоновогоЗадания);

КонецЦикла;

// Передача списка фоновых заданий управляющим процедурам на выполнение
// Формируем части сообщения обмена по одному файлу для каждого объекта конфигурации
ВыполнитьФоновыеЗадания(СписокФоновыхЗаданий, КоличествоАктивныхЗаданий);

// Собираем все части сообщения обмена вместе в главном файле (см. выше)
СформироватьЕдиноеСообщениеОбмена(СоставПланаОбмена, УзелОбмена, НомерСообщения, ЗаписьXML);

// Завершаем формирование сообщения обмена
ЗаписьСообщения.ЗакончитьЗапись();

КонецПроцедуры

Очень важно ничего не напутать с именованием файлов частей сообщения. Для этого код формирования такого имени вынесен в отдельную функцию. Вот она:

Функция ПолучитьИмяФайлаЧастиСообщенияОбмена(УзелОбмена, НомерСообщения, ОбъектМетаданных)

Возврат КаталогСообщений +
"" +
Строка(УзелОбмена.УникальныйИдентификатор()) +
"_" +
Формат(НомерСообщения, "ЧГ=0") +
"_" +
ОбъектМетаданных.Имя + ".xml";

КонецФункции

Процедура, которая формирует файл части сообщения:

// Процедура общего модуля для вызова из фонового задания и формирования части сообщения по объекту метаданных
Процедура ВыполнитьВыгрузкуИзмененийОбъектаКонфигурации(УзелОбмена, НомерСообщения, ОбъектМетаданных) Экспорт

Выборка = ПланыОбмена.ВыбратьИзменения(УзелОбмена, НомерСообщения, ОбъектМетаданных);

Если Не Выборка.Следующий() Тогда
Возврат;
КонецЕсли;

ЗаписьXML = Новый ЗаписьXML();
ПолноеИмяФайла = ПолучитьИмяФайлаЧастиСообщенияОбмена(УзелОбмена, НомерСообщения, ОбъектМетаданных);
ЗаписьXML.ОткрытьФайл(ПолноеИмяФайла);

ЗаписатьXML(ЗаписьXML, Выборка.Получить());

Пока Выборка.Следующий() Цикл
ЗаписатьXML(ЗаписьXML, Выборка.Получить());
КонецЦикла;

ЗаписьXML.Закрыть();

КонецПроцедуры

Собираются все части вместе такой процедурой:

Процедура СформироватьЕдиноеСообщениеОбмена(СоставПланаОбмена, УзелОбмена, НомерСообщения, ЗаписьXML)

Для Каждого ОбъектМетаданных Из СоставПланаОбмена Цикл

ПолноеИмяФайла = ПолучитьИмяФайлаЧастиСообщенияОбмена(УзелОбмена, НомерСообщения, ОбъектМетаданных);

Файл = Новый Файл(ПолноеИмяФайла);
Если Файл.Существует() Тогда

ЧтениеТекста = Новый ЧтениеТекста(ПолноеИмяФайла, КодировкаТекста.UTF8);

// Дописываем часть в главный файл сообщения обмена
ЗаписьXML.ЗаписатьБезОбработки(ЧтениеТекста.Прочитать());

КонецЕсли;

КонецЦикла;

КонецПроцедуры

Внимание!

Накладные расходы на создание файлов частей сообщения + последующее их объединение в один главный файл сообщения могут «съесть» всю выгоду от распараллеливания процесса. Это зависит от размеров такого сообщения. Тестируйте код перед его применением!

24 Comments

  1. TODD22

    Этот код я так понимаю пишет в один файл? То есть это для выгрузки в один узел?

    А можно таким же способом выгружать сразу по нескольким узлам? Например по 4 магазинам?

    Reply
  2. zhichkin

    (1)

    Этот код я так понимаю пишет в один файл? То есть это для выгрузки в один узел?

    Этот код пишет несколькими заданиями в один файл — формирует одно сообщение обмена. Соответственно это сообщение для одного узла.

    А можно таким же способом выгружать сразу по нескольким узлам? Например по 4 магазинам?

    Если дописать логику формирования сообщений по нескольким узлам, то да, можно. Хоть для 100 узлов одновременно, если сервер потянет.

    Reply
  3. caponid

    Вот тут «ЧтениеТекста.Прочитать()» вполне возможно получить когда нибудь «Out of memory». причем внезапно.. память течет и фрагментируется.

    А вот тут «Выборка.Получить()» вместо объекта — блокировку

    Reply
  4. zhichkin

    (3) Спасибо за дельный комментарий!

    Вы правы — в таком виде код отправлять в бой никак нельзя.

    Цель статьи: озвучить идею и дать прототип решения.

    В целях упрощения восприятия исходный код максимально сокращён.

    Reply
  5. zhichkin

    (3) Кстати, при управляемом режиме блокировок «Выборка.Получить()» не рискует нарваться на блокировку. Получится «грязное» чтение, так как при read_commited_snapshot = ON мы получим версию данных до начала блокирующей транзакции (UPDATE или DELETE в данном случае).

    Reply
  6. Makushimo

    Про выгрузку по правилам обмена тут можно и не заикаться?

    Reply
  7. zhichkin

    (6) Почему Вы так решили? Можно конкретный пример?

    ПКО, например, можно передать в фоновое задание в виде параметра … Возможно это потребует некоторой переделки самих правил, но сам механизм КД, на мой взгляд, это не отменяет.

    Reply
  8. caponid

    (4)

    Вы правы, блокировки обычно вот тут возникают — ПланыОбмена.ВыбратьИзменения

    И дополнение — зачем все переносить в один файл? — это все таки довольно затратная операция. Проще все таки тогда все передать пакетом файлов — записать их имена в сообщение обмена — да и при загрузке многопоточку организовать

    Как то так…

    Для Каждого ОбъектМетаданных Из СоставПланаОбмена Цикл
    КраткоеИмяФайла = ПолучитьКраткоеИмяФайлаЧастиСообщенияОбмена(УзелОбмена, НомерСообщения, ОбъектМетаданных);
    Файл = Новый Файл(ПолноеИмяФайла);
    Если Файл.Существует() Тогда
    ЗаписьXML.ЗаписатьНачалоЭлемента(«metaFiles», «http://metafiles»);
    ЗаписьXML.ЗаписатьАтрибут(«metaName», ОбъектМетаданных.ПолноеИмя());
    ЗаписьXML.ЗаписатьАтрибут(«File», КраткоеИмяФайла);
    ЗаписьXML.ЗаписатьКонецЭлемента();
    КонецЕсли;
    КонецЦикла;

    Показать

    Reply
  9. zhichkin

    (8) Да, «ВыбратьИзменения» генерирует UPDATE номера сообщения в таблице регистрации изменений объекта. Если таблица большая, то эта команда выполняется долго и это может приводить даже к ошибкам превышения таймаута ожидания. Подробнее можно почитать в статье, ссылку на которую я даю в этой публикации.

    По поводу отправки пакета файлов и организации многопоточной загрузки данных на принимающей стороне: согласен, можно (вероятно даже нужно) делать и так. Фантазия ограничена только возможностями. Цель этой публикации была именно в том, чтобы показать возможность.

    Reply
  10. caponid

    Не тянет это на публикацию — нет цельной идеи.

    Reply
  11. Tangram

    Сейчас на практике реализую порционный обмен (по правилам обмена).

    Я пошел другим путем — весь массив изменений, заранее выбранный запросами, бьется на порции, а потом каждая порция выбирается и выгружается.

    По метаданным неудобно — в выгрузке может быть несколько сотен одинаковых объектов, например Реализаций, велика вероятность нарваться на блокировку.

    Reply
  12. zhichkin

    (11)

    потом каждая порция выбирается и выгружается

    Это тоже вариант.

    Уточните, пожалуйста, изменения заранее выбираются куда? В оперативную память?

    Вы ПланыОбмена.ВыбратьИзменения используете? Что указываете в качестве третьего параметра фильтрации выборки?

    велика вероятность нарваться на блокировку

    Уточните, пожалуйста, какую блокировку Вы имеете ввиду? В какой момент выгрузки?

    Обычно блокировка, которая мешает, это команда UPDATE при вызове ПланыОбмена.ВыбратьИзменения. Она возникает только на этом этапе и её длительность зависит исключительно от количества изменений в таблице и третьего параметра фильтрации этого метода. Предполагаю, что Вы используете массив ссылок на объекты, который является порцией данных … Если это так, то, вероятно, это самая удачная реализация выборки изменений.

    Reply
  13. Tangram

    (12) да , изменения заранее выбираются в память. сейчас обмен в стадии тестов, в боевой базе у меня первоначально 70000-200000 объектов к выгрузке, надеюсь, сервер х64 простит мне это

    В ВыбратьИзменения в третий параметр отдаю массив из 50-200 ссылок на объекты, выбранный из основного массива.

    Reply
  14. Tangram

    а вот про блокировки интересно…

    Мои опыты на КА 1.1 (с изменениями) показывают следующее:

    ВыбратьИзменения() отрабатывает оч. быстро, данные из боевой базы: 177000 объектов в выборке за 6 секунд.

    А вот потом начинается мистика (или суровая реальность), из-за которой я затеял порционный обмен:

    фактически все 177000 объектов блокируются на запись до окончания выгрузки до операторов

    // Завершаем запись сообщения

    ЗаписьСообщения.ЗакончитьЗапись();

    ЗаписьXML.Закрыть();

    Причем это я проверял даже на копии: выбираю все изменения, там 150 документов «Реализация». выбираю из них первые 50 и на них делаю ВыбратьИзменения(). иду отладчиком по процедуре выгрузки (я слегка модиф. типовую) и в другом сеансе пытаюсь проводить документы. Пока не пройдет ЗакончитьЗапись(), ни один из «выбранных» 50 документов перепровести нельзя (стандартная ошибка MS SQL на превышение времени блокировки), остальные пожалуйста.

    Reply
  15. Tangram

    А вот сейчас воспроизвел ситуацию на типовой КА 1.1.61.2 и все документы, попавшие в ВыбратьИзменения(), спокойно перепроводятся в процессе выгрузки…

    Завтра попробую 1.1.78 и походу буду искать блокировку в доработках…

    Reply
  16. zhichkin

    (14)

    все 177000 объектов блокируются

    Какой режим управления блокировками у Вас используется (автоматический или управляемый)? Где начинается и заканчивается транзакция? Нет ли эскалации блокировок? Какой набор данных блокируется на самом деле (нет ли сканов)? На все эти вопросы нужно дать однозначный ответ.

    Не настаиваю, но рекомендую почитать мою статью про планы обмена: http://infostart.ru/public/561460/ Там всё очень подробно расписано, в том числе про блокировки. Кстати, если будут замечания или дополнения, то я с удовольствием внесу их в статью =)

    Reply
  17. zhichkin

    (15) Интересно, а у Вас случайно нет там где-нибудь начала транзакции типа «НачатьТранзакцию» после «ВыбратьИзменения» или до этого, а «ЗафиксироватьТранзакцию» где-нибудь после «ЗакончитьЗапись» ? Чисто теоретически получилась бы одна большая транзакция, для которой все остальные были бы вложенными … Честно говоря, была у меня мысль проверить такой случай, но руки пока что не дошли =) Если это неожиданно так, то границы транзакции могут быть значительно шире …

    Reply
  18. Tangram

    (17)

    а у Вас случайно нет там где-нибудь начала транзакции

    есть, причем это типовой код…. сейчас проверю…

    Reply
  19. zhichkin

    (18) Если у Вас на уровне SQL Server не отключена эскалация блокировок до уровня всей таблицы для таблицы регистрации изменений, то UPDATE более 5000 записей этой таблицы в результате вызова метода «ВыбратьИзменения» может привести к блокировке всей таблицы. Кажется я об этом тоже в своей вышеупомянутой статье писал …

    Reply
  20. Tangram

    типовой код примерно такой:

    НачатьТранзакцию();

    ВыбратьИзменения(…все…);

    Далее Цикл по выборке, но через каждые условно 10 объектов

    идет ЗафиксироватьТранзакцию(); НачатьТранзакцию();

    и в типовой конфе реально объекты, захваченные в первой десятке, отпускаются после первого же ЗафиксироватьТранзакцию().

    а в моей базе почему-то не так…

    Спасибо большое за наводки, завтра на свежую голову буду копать дальше. Если что интересное для науки найду, напишу ).

    Reply
  21. kolya_tlt

    Добрый день, Дмитрий.

    вы не пробовали объединить общие для всех узлов справочникирегистры и выгрузить их один раз для всех узлов?

    Reply
  22. zhichkin

    (21) Нет не пробовал. Если бы я решал подобную задачу, то я бы отказался от регистрации изменений по всем узлам. Делал бы это для одного какого-то узла по умолчанию, например, а потом выгружал бы один раз.

    Кроме этого, после выгрузки по всем узлам нужно обновить значение реквизита «НомерОтправленного» для каждого из них. При этом обновить это значение нужно не только в плане обмена, но и в служебной таблице регистрации изменений объекта. Без вызова метода «ВыбратьИзменения» плана обмена сделать это невозможно, а первым обязательным (!) параметром этого метода является узел плана обмена. Короче говоря, всё сильно связанно друг с другом …

    Reply
  23. kolya_tlt

    (22) поясните почему вы выбрали решение с одним узлом и как поддержка будет в этом случае контролировать обмен на скажем 500 магазинов?

    Reply
  24. zhichkin

    (23) Стоп =) Конкретного решения для конкретной ситуации я не принимал =) Я дал, так скажем, дружеский совет =) Не более того =)

    Если Вам нужно конкретное решение, то дайте, пожалуйста, конкретные цифры, описывающие параметры Вашей системы, описание текущих проблем, сформулируйте конкретные требования (пожелания) по решению этих проблем, целевые показатели системы, которых Вы хотели бы достигнуть. Вот тогда на основании анализа совершенно конкретных данных я смогу выбрать решение и даже обосновать его =)

    По поводу 500 магазинов … На моей практике РИБ в районе 150 узлов «умирает» или требует постоянного внимания и рукоприкладства. В таких случаях делается своя нетиповая разработка.

    Reply

Leave a Comment

Ваш адрес email не будет опубликован. Обязательные поля помечены *