Instaloader: Download live video?

Created on 30 Nov 2018  路  4Comments  路  Source: instaloader/instaloader

Is there any possibility to implement the download of live video (those that are saved)?

feature suggestion stale

Most helpful comment

Based on igapi@ping.
I cannot test it at the moment, my followees arn't big on live videos, but I've been able to download saved broadcasts in the past.

import instaloader
import json
import time
import requests
import os
import xml.etree.ElementTree
from datetime import datetime

UA = 'Instagram 10.26.0 (iPhone7,2; iOS 10_1_1; en_US; en-US; scale=2.00; gamut=normal; 750x1334) AppleWebKit/420+'
l = instaloader.Instaloader()
l.load_session_from_file( <--USERNAME--> ) # your username here
del l.context._session.headers['Host']
del l.context._session.headers['Origin']
del l.context._session.headers['X-Instagram-AJAX']
del l.context._session.headers['X-Requested-With']
l.context._session.headers['User-Agent'] = UA
params = {}

data = l.context.get_json(path='api/feed/reels_tray/', host='i.instagram.com', params=params)
with open('.reels_tray.json'.format(page), 'w') as fp:
    json.dump(data, fp)
for item in data.get('post_live', {}).get('post_live_items', []):
    for bc in item.get('broadcasts', []):
        dash = bc['dash_manifest']
        user = bc['broadcast_owner']['username']
        date = datetime.fromtimestamp(int(bc['published_time']))
        file = '/tmp/{}-{%Y-%m-%d_%H:%M:%S}.mp4'.format(user, date)
        get_replay(dash, file)


def get_replay(mpd, output_filename):
    # Copyright (c) 2017 https://github.com/ping
    # Copyright (c) 2018 https://github.com/e5150
    # https://opensource.org/licenses/MIT

    MPD_NAMESPACE = {'mpd': 'urn:mpeg:dash:schema:mpd:2011'}
    download_timeout = 15
    output_dir = os.path.dirname(output_filename)
    os.makedirs(output_dir, exist_ok=True)

    session = requests.Session()
    adapter = requests.adapters.HTTPAdapter(max_retries=2)
    session.mount('http://', adapter)
    session.mount('https://', adapter)

    xml.etree.ElementTree.register_namespace('', MPD_NAMESPACE['mpd'])
    mpd_document = xml.etree.ElementTree.fromstring(mpd)

    periods = mpd_document.findall('mpd:Period', MPD_NAMESPACE)
    print('Found {0:d} period(s)'.format(len(periods)))

    generated_files = []

    # Aaccording to specs, multiple periods are allowed but IG only sends one usually
    for period_idx, period in enumerate(periods):
        adaptation_sets = period.findall('mpd:AdaptationSet', MPD_NAMESPACE)
        audio_stream = None
        video_stream = None
        if not len(adaptation_sets) == 2:
            print('Unexpected number of adaptation sets: {}'.format(len(adaptation_sets)))
        for adaptation_set in adaptation_sets:
            representations = adaptation_set.findall('mpd:Representation', MPD_NAMESPACE)
            # sort representations by quality and pick best one
            representations = sorted(
                representations,
                key=lambda rep: (
                    (int(rep.attrib.get('width', '0')) * int(rep.attrib.get('height', '0'))) or
                    int(rep.attrib.get('bandwidth', '0')) or
                    rep.attrib.get('FBQualityLabel') or
                    int(rep.attrib.get('audioSamplingRate', '0'))),
                reverse=True)
            representation = representations[0]
            representation_id = representation.attrib.get('id', '')
            mime_type = representation.attrib.get('mimeType', '')
            print(
                'Selected representation with mimeType {0!s} id {1!s} out of {2!s}'.format(
                    mime_type,
                    representation_id,
                    ' / '.join([r.attrib.get('id', '') for r in representations])
                ))
            representation_base_url = representation.find('mpd:BaseURL', MPD_NAMESPACE).text
            print(representation_base_url)
            if 'video' in mime_type and not video_stream:
                video_stream = representation_base_url
            elif 'audio' in mime_type and not audio_stream:
                audio_stream = representation_base_url

            if audio_stream and video_stream:
                break


        def _get(src):
            dst = os.path.join(output_dir, os.path.basename(src))
            print('Downloading {}'.format(src))

            with closing(session.get(
                    src,
                    headers={'User-Agent': UA, 'Accept': '*/*'},
                    timeout=download_timeout, stream=True)) as res:
                res.raise_for_status()

                with open(dst, 'wb') as f:
                    print('writing', dst)
                    for chunk in res.iter_content(chunk_size=1024*100):
                        f.write(chunk)
            return dst

        audio_file = _get(audio_stream)
        video_file = _get(video_stream)


        if len(periods) > 1:
            # Generate a new filename by appending n+1
            # to the original specified output filename
            # so that it looks like output-1.mp4, output-2.mp4, etc
            dir_name = os.path.dirname(output_filename)
            file_name = os.path.basename(output_filename)
            dot_pos = file_name.rfind('.')
            if dot_pos >= 0:
                filename_no_ext = file_name[0:dot_pos]
                ext = file_name[dot_pos:]
            else:
                filename_no_ext = file_name
                ext = ''
            generated_filename = os.path.join(
                dir_name, '{0!s}-{1:d}{2!s}'.format(filename_no_ext, period_idx + 1, ext))
        else:
            generated_filename = output_filename

        cmd = [
            'ffmpeg', '-y', '-stats',
            '-loglevel', 'warning',
            '-i', audio_file,
            '-i', video_file,
            '-c:v', 'copy',
            '-c:a', 'copy',
            generated_filename,
        ]
        subprocess.call(cmd)

    return generated_files

All 4 comments

Based on igapi@ping.
I cannot test it at the moment, my followees arn't big on live videos, but I've been able to download saved broadcasts in the past.

import instaloader
import json
import time
import requests
import os
import xml.etree.ElementTree
from datetime import datetime

UA = 'Instagram 10.26.0 (iPhone7,2; iOS 10_1_1; en_US; en-US; scale=2.00; gamut=normal; 750x1334) AppleWebKit/420+'
l = instaloader.Instaloader()
l.load_session_from_file( <--USERNAME--> ) # your username here
del l.context._session.headers['Host']
del l.context._session.headers['Origin']
del l.context._session.headers['X-Instagram-AJAX']
del l.context._session.headers['X-Requested-With']
l.context._session.headers['User-Agent'] = UA
params = {}

data = l.context.get_json(path='api/feed/reels_tray/', host='i.instagram.com', params=params)
with open('.reels_tray.json'.format(page), 'w') as fp:
    json.dump(data, fp)
for item in data.get('post_live', {}).get('post_live_items', []):
    for bc in item.get('broadcasts', []):
        dash = bc['dash_manifest']
        user = bc['broadcast_owner']['username']
        date = datetime.fromtimestamp(int(bc['published_time']))
        file = '/tmp/{}-{%Y-%m-%d_%H:%M:%S}.mp4'.format(user, date)
        get_replay(dash, file)


def get_replay(mpd, output_filename):
    # Copyright (c) 2017 https://github.com/ping
    # Copyright (c) 2018 https://github.com/e5150
    # https://opensource.org/licenses/MIT

    MPD_NAMESPACE = {'mpd': 'urn:mpeg:dash:schema:mpd:2011'}
    download_timeout = 15
    output_dir = os.path.dirname(output_filename)
    os.makedirs(output_dir, exist_ok=True)

    session = requests.Session()
    adapter = requests.adapters.HTTPAdapter(max_retries=2)
    session.mount('http://', adapter)
    session.mount('https://', adapter)

    xml.etree.ElementTree.register_namespace('', MPD_NAMESPACE['mpd'])
    mpd_document = xml.etree.ElementTree.fromstring(mpd)

    periods = mpd_document.findall('mpd:Period', MPD_NAMESPACE)
    print('Found {0:d} period(s)'.format(len(periods)))

    generated_files = []

    # Aaccording to specs, multiple periods are allowed but IG only sends one usually
    for period_idx, period in enumerate(periods):
        adaptation_sets = period.findall('mpd:AdaptationSet', MPD_NAMESPACE)
        audio_stream = None
        video_stream = None
        if not len(adaptation_sets) == 2:
            print('Unexpected number of adaptation sets: {}'.format(len(adaptation_sets)))
        for adaptation_set in adaptation_sets:
            representations = adaptation_set.findall('mpd:Representation', MPD_NAMESPACE)
            # sort representations by quality and pick best one
            representations = sorted(
                representations,
                key=lambda rep: (
                    (int(rep.attrib.get('width', '0')) * int(rep.attrib.get('height', '0'))) or
                    int(rep.attrib.get('bandwidth', '0')) or
                    rep.attrib.get('FBQualityLabel') or
                    int(rep.attrib.get('audioSamplingRate', '0'))),
                reverse=True)
            representation = representations[0]
            representation_id = representation.attrib.get('id', '')
            mime_type = representation.attrib.get('mimeType', '')
            print(
                'Selected representation with mimeType {0!s} id {1!s} out of {2!s}'.format(
                    mime_type,
                    representation_id,
                    ' / '.join([r.attrib.get('id', '') for r in representations])
                ))
            representation_base_url = representation.find('mpd:BaseURL', MPD_NAMESPACE).text
            print(representation_base_url)
            if 'video' in mime_type and not video_stream:
                video_stream = representation_base_url
            elif 'audio' in mime_type and not audio_stream:
                audio_stream = representation_base_url

            if audio_stream and video_stream:
                break


        def _get(src):
            dst = os.path.join(output_dir, os.path.basename(src))
            print('Downloading {}'.format(src))

            with closing(session.get(
                    src,
                    headers={'User-Agent': UA, 'Accept': '*/*'},
                    timeout=download_timeout, stream=True)) as res:
                res.raise_for_status()

                with open(dst, 'wb') as f:
                    print('writing', dst)
                    for chunk in res.iter_content(chunk_size=1024*100):
                        f.write(chunk)
            return dst

        audio_file = _get(audio_stream)
        video_file = _get(video_stream)


        if len(periods) > 1:
            # Generate a new filename by appending n+1
            # to the original specified output filename
            # so that it looks like output-1.mp4, output-2.mp4, etc
            dir_name = os.path.dirname(output_filename)
            file_name = os.path.basename(output_filename)
            dot_pos = file_name.rfind('.')
            if dot_pos >= 0:
                filename_no_ext = file_name[0:dot_pos]
                ext = file_name[dot_pos:]
            else:
                filename_no_ext = file_name
                ext = ''
            generated_filename = os.path.join(
                dir_name, '{0!s}-{1:d}{2!s}'.format(filename_no_ext, period_idx + 1, ext))
        else:
            generated_filename = output_filename

        cmd = [
            'ffmpeg', '-y', '-stats',
            '-loglevel', 'warning',
            '-i', audio_file,
            '-i', video_file,
            '-c:v', 'copy',
            '-c:a', 'copy',
            generated_filename,
        ]
        subprocess.call(cmd)

    return generated_files

Would also love to see this being implemented. (But it seems that there are also already other great livestream capturing tools available on github.)

For Instagram? Which ones?

you can use https://github.com/taengstagram/instagram-livestream-downloader which I personally like most, cause the comments can be converted to subtitles here. But the tool is no longer updated. There are sometimes problems on downloading the stream.
Another one is https://github.com/notcammy/PyInstaLive which I use instead. It does the same, but no comments as subtitle (but plain text file).

please note that both tools have different usage syntax (see manuals)

There has been no activity on this issue for an extended period of time. This issue will be closed after further 14 days of inactivity.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

kurttian picture kurttian  路  3Comments

i-cant-git picture i-cant-git  路  3Comments

ealgase picture ealgase  路  4Comments

claudiofrancesconi picture claudiofrancesconi  路  3Comments

1337-Pete picture 1337-Pete  路  3Comments