# # Simple FLAC decoder (Python) # # Copyright (c) 2020 Project Nayuki. (MIT License) # https://www.nayuki.io/page/simple-flac-implementation # # Permission is hereby granted, free of charge, to any person obtaining a copy of # this software and associated documentation files (the "Software"), to deal in # the Software without restriction, including without limitation the rights to # use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of # the Software, and to permit persons to whom the Software is furnished to do so, # subject to the following conditions: # - The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # - The Software is provided "as is", without warranty of any kind, express or # implied, including but not limited to the warranties of merchantability, # fitness for a particular purpose and noninfringement. In no event shall the # authors or copyright holders be liable for any claim, damages or other # liability, whether in an action of contract, tort or otherwise, arising from, # out of or in connection with the Software or the use or other dealings in the # Software. # import pathlib, struct, sys def main(argv): if len(argv) != 3: sys.exit(f"Usage: python {argv[0]} InFile.flac OutFile.wav") with BitInputStream(pathlib.Path(argv[1]).open("rb")) as inp: with pathlib.Path(argv[2]).open("wb") as out: decode_file(inp, out) def decode_file(inp, out): # Handle FLAC header and metadata blocks if inp.read_uint(32) != 0x664C6143: raise ValueError("Invalid magic string") samplerate = None last = False while not last: last = inp.read_uint(1) != 0 type = inp.read_uint(7) length = inp.read_uint(24) if type == 0: # Stream info block inp.read_uint(16) inp.read_uint(16) inp.read_uint(24) inp.read_uint(24) samplerate = inp.read_uint(20) numchannels = inp.read_uint(3) + 1 sampledepth = inp.read_uint(5) + 1 numsamples = inp.read_uint(36) inp.read_uint(128) else: for i in range(length): inp.read_uint(8) if samplerate is None: raise ValueError("Stream info metadata block absent") if sampledepth % 8 != 0: raise RuntimeError("Sample depth not supported") # Start writing WAV file headers sampledatalen = numsamples * numchannels * (sampledepth // 8) out.write(b"RIFF") out.write(struct.pack("= 0b11000000: inp.read_uint(8) temp = (temp << 1) & 0xFF if blocksizecode == 1: blocksize = 192 elif 2 <= blocksizecode <= 5: blocksize = 576 << blocksizecode - 2 elif blocksizecode == 6: blocksize = inp.read_uint(8) + 1 elif blocksizecode == 7: blocksize = inp.read_uint(16) + 1 elif 8 <= blocksizecode <= 15: blocksize = 256 << (blocksizecode - 8) if sampleratecode == 12: inp.read_uint(8) elif sampleratecode in (13, 14): inp.read_uint(16) inp.read_uint(8) # Decode each channel's subframe, then skip footer samples = decode_subframes(inp, blocksize, sampledepth, chanasgn) inp.align_to_byte() inp.read_uint(16) # Write the decoded samples numbytes = sampledepth // 8 addend = 128 if sampledepth == 8 else 0 for i in range(blocksize): for j in range(numchannels): out.write(struct.pack("> 1) temp1[i] = right temp0[i] = right + side return [temp0, temp1] else: raise ValueError("Reserved channel assignment") def decode_subframe(inp, blocksize, sampledepth): inp.read_uint(1) type = inp.read_uint(6) shift = inp.read_uint(1) if shift == 1: while inp.read_uint(1) == 0: shift += 1 sampledepth -= shift if type == 0: # Constant coding result = [inp.read_signed_int(sampledepth)] * blocksize elif type == 1: # Verbatim coding result = [inp.read_signed_int(sampledepth) for _ in range(blocksize)] elif 8 <= type <= 12: result = decode_fixed_prediction_subframe(inp, type - 8, blocksize, sampledepth) elif 32 <= type <= 63: result = decode_linear_predictive_coding_subframe(inp, type - 31, blocksize, sampledepth) else: raise ValueError("Reserved subframe type") return [(v << shift) for v in result] def decode_fixed_prediction_subframe(inp, predorder, blocksize, sampledepth): result = [inp.read_signed_int(sampledepth) for _ in range(predorder)] decode_residuals(inp, blocksize, result) restore_linear_prediction(result, FIXED_PREDICTION_COEFFICIENTS[predorder], 0) return result FIXED_PREDICTION_COEFFICIENTS = ( (), (1,), (2, -1), (3, -3, 1), (4, -6, 4, -1), ) def decode_linear_predictive_coding_subframe(inp, lpcorder, blocksize, sampledepth): result = [inp.read_signed_int(sampledepth) for _ in range(lpcorder)] precision = inp.read_uint(4) + 1 shift = inp.read_signed_int(5) coefs = [inp.read_signed_int(precision) for _ in range(lpcorder)] decode_residuals(inp, blocksize, result) restore_linear_prediction(result, coefs, shift) return result def decode_residuals(inp, blocksize, result): method = inp.read_uint(2) if method >= 2: raise ValueError("Reserved residual coding method") parambits = [4, 5][method] escapeparam = [0xF, 0x1F][method] partitionorder = inp.read_uint(4) numpartitions = 1 << partitionorder if blocksize % numpartitions != 0: raise ValueError("Block size not divisible by number of Rice partitions") for i in range(numpartitions): count = blocksize >> partitionorder if i == 0: count -= len(result) param = inp.read_uint(parambits) if param < escapeparam: result.extend(inp.read_rice_signed_int(param) for _ in range(count)) else: numbits = inp.read_uint(5) result.extend(inp.read_signed_int(numbits) for _ in range(count)) def restore_linear_prediction(result, coefs, shift): for i in range(len(coefs), len(result)): result[i] += sum((result[i - 1 - j] * c) for (j, c) in enumerate(coefs)) >> shift class BitInputStream: def __init__(self, inp): self.inp = inp self.bitbuffer = 0 self.bitbufferlen = 0 def align_to_byte(self): self.bitbufferlen -= self.bitbufferlen % 8 def read_byte(self): if self.bitbufferlen >= 8: return self.read_uint(8) else: result = self.inp.read(1) if len(result) == 0: return -1 return result[0] def read_uint(self, n): while self.bitbufferlen < n: temp = self.inp.read(1) if len(temp) == 0: raise EOFError() self.bitbuffer = (self.bitbuffer << 8) | temp[0] self.bitbufferlen += 8 self.bitbufferlen -= n result = (self.bitbuffer >> self.bitbufferlen) & ((1 << n) - 1) self.bitbuffer &= (1 << self.bitbufferlen) - 1 return result def read_signed_int(self, n): temp = self.read_uint(n) temp -= (temp >> (n - 1)) << n return temp def read_rice_signed_int(self, param): val = 0 while self.read_uint(1) == 0: val += 1 val = (val << param) | self.read_uint(param) return (val >> 1) ^ -(val & 1) def close(self): self.inp.close() def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() if __name__ == "__main__": main(sys.argv)