1c + kafka.apache











Пример построения микросервисов с использованием apache kafka. Данная статья будет полезна интеграторам, программистам. Версия и релиз технологической платформы не имеет значения.

Пример построения микросервисов с использованием apache kafka.

Задача:

Построить событийную архитектуру между приложениями(микросервисы)(не только 1с).

Общая логика решения задачи():

1. Приложение А генерирует событие                                                         

2. Другое приложение Б(В, Г..) "слушает"  кафка (подписывается на событие)(если коротко)

3. Приложение А отправляет данные кафка (Поставщик)

4. кафка отправляет данные всем кто подписан на это событие (Потребитель)

5. Приложение Б(В, Г..)  получает данные

Технологии:

1. 1с (поставщик,  потребитель, tcp клиент)

2.  Разнородный бэкенд (python,  и тд) (поставщик,  потребитель)

3. с++ 1с вк native (транспорт для tcp клиента 1с)

4. apache kafka (шина данных) https://kafka.apache.org

5. flask (python micro web сервер) http://flask.pocoo.org

6. tcp сервер (python)

7. KafkaConsumer, KafkaProducer  (python) https://github.com/dpkp/kafka-python

Общая принцип работы(модули):

1с(1):

При запуске 1с стартует tcp клиент (вк native)(3) .Устанавливает соединение с tcp сервером(6) (передавая ид пользователя из 1с). Подписывается на событие вк native. При наступления события уходит в контекст 1с через ОбработкаВнешнегоСобытия для последующей обработки. Для генерации событий выполняет гет, пост запросы на  веб сервер(5).

tcp сервер(6):

Хеш таблица открытых сокетов(активных соединений) и ид пользователя. Подписывается на события шины данных для отправки клиенту 1с. При наступлении событий отправляет данные в сокет(клиенту 1с). Генерирует события при авторизации(установка сокета), выхода(разрыв, закрытие сокета) в шину данных.

web сервер(5):

Обрабатывает запросы со стороны 1с(прокси для кафка) и отправляет в шину данных. Также подписывается на события сторонних приложений требующих отправку данных для 1с(изменился статус звонка asterisk, прилетел тригер для обновления формы, перегнать что то с &НаСервере на &НаКлиенте , и тд).

Шина данных(4):

Управляет оркестром приложений(упорядоченная очередь). При наступлении события маршутизирует подписчикам, если подсписчиков нет, хранит у себя, доставит потом когда появятся.

Разнородный бэкенд(2):

Подписывается, генерирует события в шину данных.(например web app time line(журнал записи), asterisk ami client(автообзвон, фонер), из предыдущих статей)

Шаг 1. tcp сервер 

 

 tcp сервер

установка сокета(прослушка):
while True:

    try:
        buf = request.recv(256).decode(‘utf-8’)
    except:
        break

отправка в сокет:

sock.sendall(message.value)

генерация события для шины данных: 

producer.send(f’user_auth’, key=key, value=value) 

подписка,обработка события:

consumer = KafkaConsumer(‘ones_socket_send_user’, ‘ones_socket_send_user_send_all’, ‘ones_socket_add_listener’, 
                                 group_id=’my-group’, bootstrap_servers=[‘192.168.777.555:9092’]) 
for message in consumer:
    if message.topic=="ones_socket_send_user":

Шаг 2. web сервер

 

 start

 

 route

 

 app

регистрация енд поинт(точка входа, путь и тд):

@app.route(‘/api_user_event’,  methods=[‘GET’, ‘POST’])
def api_user_event(): …

Шаг 3. Пример генерации из стороннего приложения

 

 time line triger update

 

 asterisk ami event triger

def event_listener(event, **kwargs):
if event.name in EVENTS_NOT_LISTEN:
return

#print(f"""{str(event)}""")

if event.name == 'Newstate' or event.name == 'Hangup' or event.name == 'VarSet':
if event.name == 'VarSet':
if not event.keys['Variable'] in EVENTS_VARS_LISTEN:
return

for x in USER_EVENTS:
if USER_EVENTS[x]['channel'] in event.keys['Channel']:
d = {}

d['Действия'] = 'Asterisk_Event'
d['Данные'] = event.keys
d['Данные']['name'] = event.name
d['Данные']['id_ext'] = USER_EVENTS[x]['id_ext']
d['Данные']['event_date'] = datetime.now()

jdata = json.dumps(d, default=json_serial)

do_user_event(jdata, USER_EVENTS[x]['user'])

print(f"""{str(event)}""")

Прослушка ами:

client = AMIClient(address=AMI_ADDRESS, port=AMI_PORT)
future = client.login(username=AMI_USER, secret=AMI_SECRET)

client.add_event_listener(event_listener)

 

Шаг 4. 1с

 

пример кода для работы

Шаг 5. вк native

Полное создание вк native в статье не рассматривается.

 

(часть tcp клиента)

Шаг 6. 1c web services

Используется для передачи сообщения из бизнес приложения для отправки в 1с (серверная часть)(для дальнейшей обработка бизнес логики в контексте 1с) (создание документа, старт бизнес процесса, заполнение справочника) (конечно можно сделать сделать прямой вызов без использования сообщений, здесь идея в том что на один ответ 1c ws может быть в дальнейшем подписано несколько подписчиков, тем самым все получат сообщение и будут выполнят свою логику далее)      

 

1с ws consumer

Подключение к ws:

client = Client(ws, username = username, password = password, cache = NoCache(), timeout = 60)

Вызов метода в контексте 1с(метод ws сервиса):

eval_str = ‘client.service.%s(%s)’ % (method, jdata and "%r" % jdata or »)
    
    try:
        res = eval(eval_str)
    except Exception as e:
        print(e)
        return

Конструкция вида: upd_event_lists.append([]), и затем  _thread.start_new_thread(upd_loop, (i,)), сделано для того чтобы не завалить ws 1c. Те может случится так что прилетит много запросов и если делать send сразу происходит отказ в установке соединения (Удаленный компьютер отверг запрос на подключение, еррор вин сок и тд.)(проверено примерно на 1000 сообщения поданных вход модулю который выполнял _thread.start_new_thread(send, (username, password, jdata, method, ws,)) для каждого принятого сообщения, сгенерировав их в кафку предварительно с другого модуля, произошел отказ(пользователей было 400 на момент приема сообщений) ну и 1с чета затупила сразу, в плане установки новых соединений с рпхостом(рагент сдох короче)).

 

 1с http consumer

 

import json
from kafka import KafkaProducer, KafkaConsumer
#import _thread
import time
import requests
from requests.auth import HTTPBasicAuth


headers = {'Content-type': 'application/json'}
#'Accept': 'text/plain'}


ONES_USER = ''
ONES_PASSWORD = ''
ONES_HOST = '192.168.555.560'
ONES_HTTP = 'http://192.168.555.560/AA'

consumer = KafkaConsumer('ones_http_event', group_id='my-group', bootstrap_servers=['192.168.5.131:9092'])

producer = KafkaProducer(bootstrap_servers=['192.168.5.131:9092'])

for message in consumer:
print(f"""ones_http_event->{message}""")

method = message.key.decode('utf-8')

data = message.value.decode('utf-8')

jdata = data

username = ONES_USER
password = ONES_PASSWORD

data = json.loads(data)

id_request = None

if not data.get('id_request') is None:
id_request = data.get('id_request')

if not data.get('event_setting') is None:
event_setting = data.get('event_setting')

username = event_setting.get('username')
password = event_setting.get('password')

jdata = event_setting.get('jdata')

if not jdata.get('id_request') is None:
id_request = jdata.get('id_request')

auth_ones = HTTPBasicAuth(username, password)

r = None

try:
r = requests.get(f"""{ONES_HTTP}/hs/gate/v1?method={method}&data={jdata}""", headers=headers, auth=auth_ones)
#data=res_str,
except Exception as e:
print(e)

res = ''

if not r is None:
if r.status_code != 200:
res = r.reason
print(r.reason)
else:
res = r.text

if not id_request is None:
key = id_request.encode('utf-8')
value = res.encode('utf-8')

producer.send('ones_http_event_response', key=key, value=value)

 

в 1с это http сервис. Это работает быстрее ws, потому что ws это soap over http. Чтобы понять нужно cмотреть в структуру вызовов клиент-сервер SOAP.(получение wsdl, вызов метода soap и тд). http сервис — это голый http — get/post/put/head

Примеры работы:

1c посылает гет во внешнее приложение->внешнее приложение получает событие, обрабытвает генерирует событие на обновление клиента 1с отправляет в кафка->кафка марштузирует tcp серверу->tcp сервер отпраляет в сокет всем клиентам которые зарегистрированы по данному ид->1с обрабатывает внешние событие.

1c посылает гет, пост на веб сервер->веб сервер отправляет в кафка->кафка марштузирует tcp серверу->tcp сервер отпраляет в сокет всем клиентам которые зарегистрированы по данному ид->1с обрабатывает внешние событие.

Пример отправки и получения 500 запросов + в процессе отправки генерация из внешних приложений.

Пример подключения/отключения в процессе отправки.

 

 

Деплой и идеология apache kafka в статье не рассматривается(написано тысячи статей).

Для мониторинга apache kafka используется https://github.com/yahoo/kafka-manager.

 

Метрика:

upd для тех кто мало что понял мануал для dot net от ms, смотреть в общие принципы и идеи

 

67 Comments

  1. TreeDogNight

    С помощью какой программы снимали GIF ?

    Reply
  2. s_vidyakin

    ну хоть внешнюю компоненту-то выложите ((

    Reply
  3. KroVladS

    Статья для того чтобы похвастаться?

    Примеры чтобы самим потестить где?

    Reply
  4. s_vidyakin

    Чтобы скомпилить одну ДЛЛ-ку надо поставить 2 гигабайтную студию. ОК.

    Запускаю билд примера с ИТС — 211 ошибок при сборке. WTF??

    Reply
  5. s_vidyakin

    kafka-manager как поставить? там в описании указано что надо ставить Play Framework и компилить бинарник через sbt. Так?

    Reply
  6. KroVladS

    (1)

    С помощью какой программы снимали GIF ?

    Судя по ватермаркам автор пользовался Icecream Screen Recorder

    мне больше нравится LICEcap

    Reply
  7. minimajack

    Какова производительность отправки?

    Reply
  8. s_vidyakin

    фух, поставил зукипер, кафку, кафкоманагер. Осталось компоненту внешнюю получить!

    Reply
  9. minimajack

    (8) не забудьте про питон и скрипты запуска вебсервера

    Reply
  10. comol

    Так я не понял со стороны 1С сообщения получаются на tcp сервер? т.е. на стороне 1С должна быть поднята ВК с tcp сервером, так?

    Reply
  11. dmarenin

    (1) Icecream Screen Recorder дальше в гугле convert web m to gif

    Reply
  12. dmarenin

    (3) примеры чего?

    Reply
  13. dmarenin

    (5) нет есть путь короче имя ему докер хаб

    Reply
  14. dmarenin

    (10) нет. не сервер, tcp клиент. в статье написано.

    Reply
  15. dmarenin

    (9) всё есть в статье

    Reply
  16. dmarenin

    (8) дак там ничего в ней нет. сишный tcp клиент под интерфейсом addin 1c. ну если сильно нужно то конечно могу приложить

    Reply
  17. dmarenin

    (16) код tcp клиента есть в статье. вообще на этом форуме возможно их есть с десяток.

    Reply
  18. dmarenin

    (7) в сравнении с чем? на гифках видно думаю при отправке 500 постов. с учетом того что штатными средствами (без использования бд) межсеансовая клиентская передача не возможна, считаю ваш вопрос не корректным. но суть не сколько во межсеансовой передаче, сколько во внешнем воздействии (события)

    Reply
  19. dmarenin

    (4) ну дак вин сдк нужно для версии длл, иначе как?

    Reply
  20. s_vidyakin

    (19) Классно, как раз хотел закачать еще 3 ГБ библиотек и почпокаться с С++ первый раз в жизни

    Reply
  21. dmarenin

    (20) больше 3х если для крос

    Reply
  22. s_vidyakin

    (13) 6 тысяч результатов по «kafka», в первых нескольких нет инфы, что в них есть zkeeper, kafka-manager. Опять нет конкретики

    Reply
  23. dmarenin
  24. minimajack

    (18) сколько пакетов в секунду размером в 1 байт отправляется

    Reply
  25. dmarenin

    (24) сколько пакетов отправляется откуда?

    Reply
  26. minimajack

    (25) в кафку с клиента

    Reply
  27. comol

    (14)

    Меня смутили:

    1с обрабатывает внешние событие

    внешнее событие генерит компонента?

    tcp сервер отпраляет в сокет всем клиентам

    клиенты же отправляют…

    Reply
  28. dmarenin

    (24) там все над сишные, крестовые поделки(питон(консумер, продакшен кафка), 1с аддин), отсюда скорость.(ну да ртос далеко, ну так реалтайм до 1с, дальше конечно просадка). более 100 сокетов.сразу. к чему такой вопрос?

    Reply
  29. dmarenin

    (27) генерит компонента? да. клиенты же отправляют не всегда, в чем суть то и есть. поставщиком может быть как 1с так и не 1с. и клиенты чего?? в статье же разделение логики вроде как описывалось. есть часть которая отвечает только за отправку в сокеты для 1с, есть часть которая генерирует эти события, есть часть которая генерирует события которые нужно в сокет отправить. объяснил?

    Reply
  30. s_vidyakin

    (23) Было бы круто сделать свой докер со всеми упомянутыми средствами — zkeeper, kafka, kafka-man, flask, и одной командой разворачивать всю схему

    Reply
  31. dmarenin

    (2) добавил

    Reply
  32. dmarenin

    (30) zkeeper, kafka, kafka-man, flask, …. windows, 1c, excel, word, doom…

    а если по делу суть контейнеров как раз так и заключается в разделении приложения

    те.

    вот есть кафка

    есть кафка менеджер

    есть бизнес поделки

    вот они как бы вместе взаимодействуют.

    оркестр

    это мое мнение, и вот так вот мы делаем.

    flask вообще пакет питона, ставится одной строкой в докер бизнес приложения(если в имидже его не было).

    Reply
  33. dmarenin

    (26) добавил гифку по скорости передачи, там где с браузера передача идет (последняя гифка). это под рабочей нагрузкой.

    Reply
  34. minimajack

    (33) господи….цифру напишите — потолок.

    Например отправляется 200 сообщений в секунду. Пиковая нагрузка полторы тыщи.

    Reply
  35. minimajack

    (28) как можно по гифке можно оценить скорость? Вот 100 сокетов — это уже полезно…значит на 100 пользователях возможно не упадет =)

    Хочу оценить порядок скорости со своим велосипедом.

    Reply
  36. dmarenin

    (35) должно упасть? если и упадет кафка доставит чуть позже. см идеологию

    Reply
  37. dmarenin

    (34) потолок чего? отказа?

    Reply
  38. minimajack

    (36) хорошо…

    У меня стоит двадцать датчиков которые генерируют 100 событий в секунду. Смогу ли я отправлять в кафку 2000 сообщений из клиента 1С? А если да — сколько я могу поставить еще датчиков что бы уперется в предел отправки?

    Reply
  39. dmarenin

    (36) по гифке? визуально явно там не байты ходят. вам метрика нужна? я могу замерить, потом окажется у вас железо не то, деплой не тот, нагрузка не та., и тд…

    Reply
  40. dmarenin

    (38) не упрется в том то и дело! что упорядоченная очередь. те как там в 1с прийдет дело 3 тье, главное их в стэк очередь. и если честно генерацию датчиков 2000 в сек наверно лучше не делать из 1с. в моем примере отправка 500 постов в первом случае, 5000 в другом на скринах, не упало.

    Reply
  41. s_vidyakin

    а потянет кафка перекидывания 1..2-х мегабайтными XML-ями?

    Reply
  42. s_vidyakin

    Еще вопрос, как можно в реальном времени смотреть за нагрузкой/очередями в кафке?

    Reply
  43. minimajack

    (39)

    явно там не байты ходят. вам метрика нужна? я могу замерить, потом окажется у вас железо не то, деплой не тот, нагрузка не та

    да какая разница какое железо…там в 100% упрется в 1С. По гифке 1 сообщение в секунду =)

    Reply
  44. dmarenin

    (41) потянет 100% (тут вопрос не кафке уже а в железе и оси), но лучше не так. создать файл, сделать ивент, получить ивент типа прочитай. (типа выставил указатель), получил ответ снял с регистрации, и тд. это как бы и есть событийная логика. а не гонять данные. на крайняк кафка может в стримы.

    Reply
  45. dmarenin

    (42) да на скринах есть(нет передачи потому что нужно включить логи). прям реал тайм график построить кафка манеджер не сможет, можно самому запилить там делов… еще вопросы?

    Reply
  46. dmarenin

    (43) да упрется в 1с так как карусельная виртуальная машина(выполнение потоков по карусели)

    Reply
  47. dmarenin

    (46) почему и высокую нагрузку в обход 1с и предлагал делать, а там куда запишет и как не столь важно. наверно имеет смысл после кафки писать в дб сразу. тогда показания будут корректными. на 1с только отображать

    Reply
  48. minimajack

    Вы нагрузочное тестирование вообще проводили?

    Reply
  49. s_vidyakin

    (45) Как все это запускается? У вас все сервисы в докере? Вроде можно композером объединить все в одну кучу?

    И кстати tcp-сервер на питоне надо в виде сервиса завернуть? или как запускать чтоб постоянно работал?

    Reply
  50. dmarenin

    (49) в скрипте работает постоянно. ибо main. в статье разные приложения. даже расписано что за чо отвечает. ключевое слово «шаг»

    ответ на вопрос » кстати tcp-сервер на питоне надо в виде сервиса завернуть?» screen

    Reply
  51. dmarenin

    (48) отправки 20000 в сек сообщений с 2000 клиентов из 1с. конечно нет. а смысл ? в статье нет речи о посторении скады. тесты были для бизнес приложений и взаимодействия 1с. написано выше было более 100 открытых сокетов и пример передачи. какой вопрос? вы что хотите цифры? дак разверните у себя сделайте замер…

    Reply
  52. minimajack

    (51) а смысл кафки в вашем контексте? 100 открытых сокетов держит древний селерон с гигом оперативы…а 1 сообщение в секунду можно и в sqllite писать и читать…

    Нагнать хайпа ? ))

    Reply
  53. dmarenin

    (52) смысл во внешнем воздействии сторонних приложений и обработкой в 1с. обработка ивента например на обновление формы, реакция на смену статуса звонка, реакция межсеансового события 1с клиента. sqllite писать и читать это возможно ваш подход, не мой. прочитайте статью от начала. просьба.

    Reply
  54. s_vidyakin

    (50) то что постоянно работает я понял, я имею в виду как сделать автостарт и фоновую работу всех служб. В винде можно специальной софтиной из любой программы сделать обычную службу. У вас все эти сервисы на линуксе как демоны или както по другому?

    Reply
  55. dmarenin

    (54) «У вас все эти сервисы» (он один контейнер из статьи) «на линуксе» -да, «как демоны» — нет. в винде батник нормально сработает. в дебе в полне себе баш сработает вида:

    screen -dmUS queue выпелено

    screen -dmUS server python3.6 /OnesSocketServer/OnesSocketServer/ones_socket_server.py

    screen -dmUS proxy python3.6 /OnesSocketServer/ProxyOnesSocketServer/runserver.py

    screen -dmUS consumer python3.6 /OnesSocketServer/ProxyOnesSocketServer/ones_socket_consumer­.py — это часть из статьи вынесенная отдельно для генерации событий отправки в сокет, можно так не делать, можно делать…

    Reply
  56. minimajack

    Насколько я вижу взаимодействие происходит с вебсервером(TCP-сервером)…что и где там дальше по цепочке впринцепе не важно…меняем кафку на реббит или базу данных — и ничего для 1С не меняется.

    Вот в чем вопрос…почему кафка?? а ответа нет.

    Так варианты:

    1С + webserver + кафка

    1C + регистр сведений + чтение периодическое

    1С + native api + rabbit mq

    Reply
  57. dmarenin

    (56) ну давайте не будем про 1C + регистр сведений + чтение периодическое

    1С + native api + rabbit mq

    что за native сможет в туда и сюда(клиент и сервер)?

    Reply
  58. dmarenin

    (57) есть сорсы(крестовые) для натс под 1с(натив вк) стоит ли мерятся? не вижу смысла.

    Reply
  59. dmarenin

    (56) «Насколько я вижу взаимодействие происходит с вебсервером(TCP-сервером) » ну да дальше то сокет уже открыт, считай лонг пол для сторонних приложений. бля, до сих пор не понимаю. в чем вопрос то??

    Reply
  60. minimajack

    (57)

    Вместо метрики — gif.

    Вместо внятного объяснения выбора кафки — хрень с натсом.

    Нормально бы написали:

    Пользовался тем то — устраивало

    Пользовался тем то — стало лучше

    Пользовался тем-то — еще лучше и стабильней

    Внедрил кафку — ваще огонь…

    p.s. Yellow RabbitMQ — вроде может и отправлять и принимать, и на сервере и на клиенте. Если что я не от них =)

    Reply
  61. minimajack

    (58) а толку? Снова же: конфигурация не та…деплой говно, метрику не дам, нагрузочного тестирования не делал =))))

    Reply
  62. dmarenin

    (60) метрика:

    Reply
  63. dmarenin

    (60) еще метрика

    Reply
  64. dmarenin

    (60) еще метрика:

    Хватит метрик? или еще добавить?

    Reply
  65. dmarenin

    (61) «конфигурация не та…деплой говно» — возможно, это будут ваши первые слова когда наши метрики не совпадут и у вас будет ниже, «метрику не дам» — дам,

    «нагрузочного тестирования не делал» — вероятно делал

    Reply
  66. minimajack

    Спасибо!

    Reply
  67. starik-2005

    (4) бедняги подвиндозные )))

    Reply

Leave a Comment

Ваш адрес email не будет опубликован. Обязательные поля помечены *